Coverage Report

Created: 2026-06-23 14:43

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/block_file_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache.cpp
19
// and modified by Doris
20
21
#include "io/cache/block_file_cache.h"
22
23
#include <gen_cpp/file_cache.pb.h>
24
25
#include <cstdio>
26
#include <exception>
27
#include <fstream>
28
#include <unordered_set>
29
30
#include "common/status.h"
31
#include "cpp/sync_point.h"
32
#include "runtime/exec_env.h"
33
34
#if defined(__APPLE__)
35
#include <sys/mount.h>
36
#else
37
#include <sys/statfs.h>
38
#endif
39
40
#include <chrono> // IWYU pragma: keep
41
#include <mutex>
42
#include <ranges>
43
44
#include "common/cast_set.h"
45
#include "common/check.h"
46
#include "common/config.h"
47
#include "common/logging.h"
48
#include "core/uint128.h"
49
#include "exec/common/sip_hash.h"
50
#include "io/cache/block_file_cache_ttl_mgr.h"
51
#include "io/cache/file_block.h"
52
#include "io/cache/file_cache_common.h"
53
#include "io/cache/fs_file_cache_storage.h"
54
#include "io/cache/mem_file_cache_storage.h"
55
#include "runtime/runtime_profile.h"
56
#include "util/concurrency_stats.h"
57
#include "util/stack_util.h"
58
#include "util/stopwatch.hpp"
59
#include "util/thread.h"
60
#include "util/time.h"
61
namespace doris::io {
62
63
namespace {
64
65
constexpr std::array<FileCacheType, 4> LRU_LOG_REPLAY_TYPES = {
66
        FileCacheType::TTL, FileCacheType::INDEX, FileCacheType::NORMAL, FileCacheType::DISPOSABLE};
67
68
752
size_t file_cache_type_index(FileCacheType type) {
69
752
    return static_cast<size_t>(type);
70
752
}
71
72
} // namespace
73
74
// Insert a block pointer into one shard while swallowing allocation failures.
75
4.34M
bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block, size_t max_queue_size) {
76
4.34M
    if (!block || max_queue_size == 0) {
77
1
        return false;
78
1
    }
79
4.34M
    bool reserved = false;
80
4.34M
    try {
81
4.34M
        auto* raw_ptr = block.get();
82
4.34M
        auto idx = shard_index(raw_ptr);
83
4.34M
        auto& shard = _shards[idx];
84
4.34M
        std::lock_guard lock(shard.mutex);
85
4.34M
        if (shard.entries.contains(raw_ptr)) {
86
4.22M
            return false;
87
4.22M
        }
88
125k
        size_t cur_size = _size.load(std::memory_order_relaxed);
89
140k
        while (cur_size < max_queue_size) {
90
140k
            if (_size.compare_exchange_weak(cur_size, cur_size + 1, std::memory_order_relaxed)) {
91
140k
                reserved = true;
92
140k
                break;
93
140k
            }
94
140k
        }
95
125k
        if (!reserved) {
96
1
            return false;
97
1
        }
98
125k
        auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
99
125k
        DORIS_CHECK(inserted);
100
125k
        return true;
101
125k
    } catch (const std::exception& e) {
102
0
        if (reserved) {
103
0
            decrease_size(1);
104
0
        }
105
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what();
106
0
    } catch (...) {
107
0
        if (reserved) {
108
0
            decrease_size(1);
109
0
        }
110
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error";
111
0
    }
112
0
    return false;
113
4.34M
}
114
115
// Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures.
116
4.72k
size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>* output) {
117
4.72k
    if (limit == 0 || output == nullptr) {
118
2
        return 0;
119
2
    }
120
4.72k
    size_t drained = 0;
121
4.72k
    try {
122
4.72k
        output->reserve(output->size() + std::min(limit, size()));
123
302k
        for (auto& shard : _shards) {
124
302k
            if (drained >= limit) {
125
1
                break;
126
1
            }
127
302k
            std::lock_guard lock(shard.mutex);
128
302k
            auto it = shard.entries.begin();
129
302k
            size_t shard_drained = 0;
130
440k
            while (it != shard.entries.end() && drained + shard_drained < limit) {
131
138k
                output->emplace_back(std::move(it->second));
132
138k
                it = shard.entries.erase(it);
133
138k
                ++shard_drained;
134
138k
            }
135
302k
            if (shard_drained > 0) {
136
798
                decrease_size(shard_drained);
137
798
                drained += shard_drained;
138
798
            }
139
302k
        }
140
4.72k
    } catch (const std::exception& e) {
141
0
        LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what();
142
0
    } catch (...) {
143
0
        LOG(WARNING) << "Failed to drain LRU update blocks: unknown error";
144
0
    }
145
4.72k
    return drained;
146
4.72k
}
147
148
// Remove every pending block, guarding against unexpected exceptions.
149
34
void NeedUpdateLRUBlocks::clear() {
150
34
    try {
151
2.17k
        for (auto& shard : _shards) {
152
2.17k
            std::lock_guard lock(shard.mutex);
153
2.17k
            if (!shard.entries.empty()) {
154
2
                auto removed = shard.entries.size();
155
2
                shard.entries.clear();
156
2
                decrease_size(removed);
157
2
            }
158
2.17k
        }
159
34
    } catch (const std::exception& e) {
160
0
        LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what();
161
0
    } catch (...) {
162
0
        LOG(WARNING) << "Failed to clear LRU update blocks: unknown error";
163
0
    }
164
34
}
165
166
800
void NeedUpdateLRUBlocks::decrease_size(size_t delta) {
167
800
    size_t cur_size = _size.load(std::memory_order_relaxed);
168
800
    while (true) {
169
800
        DORIS_CHECK_GE(cur_size, delta);
170
800
        if (_size.compare_exchange_weak(cur_size, cur_size - delta, std::memory_order_relaxed)) {
171
800
            return;
172
800
        }
173
800
    }
174
800
}
175
176
4.34M
size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const {
177
4.34M
    DCHECK(ptr != nullptr);
178
4.34M
    return std::hash<FileBlock*> {}(ptr)&kShardMask;
179
4.34M
}
180
181
BlockFileCache::BlockFileCache(const std::string& cache_base_path,
182
                               const FileCacheSettings& cache_settings)
183
188
        : _cache_base_path(cache_base_path),
184
188
          _capacity(cache_settings.capacity),
185
188
          _max_file_block_size(cache_settings.max_file_block_size) {
186
188
    _cur_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
187
188
                                                                     "file_cache_cache_size", 0);
188
188
    _cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>(
189
188
            _cache_base_path.c_str(), "file_cache_capacity", _capacity);
190
188
    _cur_ttl_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
191
188
            _cache_base_path.c_str(), "file_cache_ttl_cache_size", 0);
192
188
    _cur_normal_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
193
188
            _cache_base_path.c_str(), "file_cache_normal_queue_element_count", 0);
194
188
    _cur_ttl_cache_lru_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
195
188
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_size", 0);
196
188
    _cur_ttl_cache_lru_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
197
188
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_element_count", 0);
198
188
    _cur_normal_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
199
188
            _cache_base_path.c_str(), "file_cache_normal_queue_cache_size", 0);
200
188
    _cur_index_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
201
188
            _cache_base_path.c_str(), "file_cache_index_queue_element_count", 0);
202
188
    _cur_index_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
203
188
            _cache_base_path.c_str(), "file_cache_index_queue_cache_size", 0);
204
188
    _cur_disposable_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
205
188
            _cache_base_path.c_str(), "file_cache_disposable_queue_element_count", 0);
206
188
    _cur_disposable_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
207
188
            _cache_base_path.c_str(), "file_cache_disposable_queue_cache_size", 0);
208
209
188
    _queue_evict_size_metrics[0] = std::make_shared<bvar::Adder<size_t>>(
210
188
            _cache_base_path.c_str(), "file_cache_index_queue_evict_size");
211
188
    _queue_evict_size_metrics[1] = std::make_shared<bvar::Adder<size_t>>(
212
188
            _cache_base_path.c_str(), "file_cache_normal_queue_evict_size");
213
188
    _queue_evict_size_metrics[2] = std::make_shared<bvar::Adder<size_t>>(
214
188
            _cache_base_path.c_str(), "file_cache_disposable_queue_evict_size");
215
188
    _queue_evict_size_metrics[3] = std::make_shared<bvar::Adder<size_t>>(
216
188
            _cache_base_path.c_str(), "file_cache_ttl_cache_evict_size");
217
188
    _total_evict_size_metrics = std::make_shared<bvar::Adder<size_t>>(
218
188
            _cache_base_path.c_str(), "file_cache_total_evict_size");
219
188
    _total_read_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
220
188
                                                                     "file_cache_total_read_size");
221
188
    _total_hit_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
222
188
                                                                    "file_cache_total_hit_size");
223
188
    _gc_evict_bytes_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
224
188
                                                                    "file_cache_gc_evict_bytes");
225
188
    _gc_evict_count_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
226
188
                                                                    "file_cache_gc_evict_count");
227
228
188
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
229
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
230
188
                                                  "file_cache_evict_by_time_disposable_to_normal");
231
188
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
232
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
233
188
                                                  "file_cache_evict_by_time_disposable_to_index");
234
188
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
235
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
236
188
                                                  "file_cache_evict_by_time_disposable_to_ttl");
237
188
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
238
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
239
188
                                                  "file_cache_evict_by_time_normal_to_disposable");
240
188
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
241
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
242
188
                                                  "file_cache_evict_by_time_normal_to_index");
243
188
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
244
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
245
188
                                                  "file_cache_evict_by_time_normal_to_ttl");
246
188
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
247
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
248
188
                                                  "file_cache_evict_by_time_index_to_disposable");
249
188
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
250
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
251
188
                                                  "file_cache_evict_by_time_index_to_normal");
252
188
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
253
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
254
188
                                                  "file_cache_evict_by_time_index_to_ttl");
255
188
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
256
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
257
188
                                                  "file_cache_evict_by_time_ttl_to_disposable");
258
188
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
259
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
260
188
                                                  "file_cache_evict_by_time_ttl_to_normal");
261
188
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
262
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
263
188
                                                  "file_cache_evict_by_time_ttl_to_index");
264
265
188
    _evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] =
266
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
267
188
                                                  "file_cache_evict_by_self_lru_disposable");
268
188
    _evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] =
269
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
270
188
                                                  "file_cache_evict_by_self_lru_normal");
271
188
    _evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] = std::make_shared<bvar::Adder<size_t>>(
272
188
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_index");
273
188
    _evict_by_self_lru_metrics_matrix[FileCacheType::TTL] = std::make_shared<bvar::Adder<size_t>>(
274
188
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl");
275
276
188
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
277
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
278
188
                                                  "file_cache_evict_by_size_disposable_to_normal");
279
188
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
280
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
281
188
                                                  "file_cache_evict_by_size_disposable_to_index");
282
188
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
283
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
284
188
                                                  "file_cache_evict_by_size_disposable_to_ttl");
285
188
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
286
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
287
188
                                                  "file_cache_evict_by_size_normal_to_disposable");
288
188
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
289
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
290
188
                                                  "file_cache_evict_by_size_normal_to_index");
291
188
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
292
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
293
188
                                                  "file_cache_evict_by_size_normal_to_ttl");
294
188
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
295
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
296
188
                                                  "file_cache_evict_by_size_index_to_disposable");
297
188
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
298
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
299
188
                                                  "file_cache_evict_by_size_index_to_normal");
300
188
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
301
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
302
188
                                                  "file_cache_evict_by_size_index_to_ttl");
303
188
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
304
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
305
188
                                                  "file_cache_evict_by_size_ttl_to_disposable");
306
188
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
307
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
308
188
                                                  "file_cache_evict_by_size_ttl_to_normal");
309
188
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
310
188
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
311
188
                                                  "file_cache_evict_by_size_ttl_to_index");
312
313
188
    _evict_by_try_release = std::make_shared<bvar::Adder<size_t>>(
314
188
            _cache_base_path.c_str(), "file_cache_evict_by_try_release");
315
316
188
    _num_read_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
317
188
                                                             "file_cache_num_read_blocks");
318
188
    _num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
319
188
                                                            "file_cache_num_hit_blocks");
320
188
    _num_removed_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
321
188
                                                                "file_cache_num_removed_blocks");
322
323
188
    _no_warmup_num_read_blocks = std::make_shared<bvar::Adder<size_t>>(
324
188
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks");
325
188
    _no_warmup_num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(
326
188
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks");
327
328
188
#ifndef BE_TEST
329
188
    _num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
330
188
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_5m", _num_hit_blocks.get(), 300);
331
188
    _num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
332
188
            _cache_base_path.c_str(), "file_cache_num_read_blocks_5m", _num_read_blocks.get(), 300);
333
188
    _num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
334
188
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_1h", _num_hit_blocks.get(), 3600);
335
188
    _num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
336
188
            _cache_base_path.c_str(), "file_cache_num_read_blocks_1h", _num_read_blocks.get(),
337
188
            3600);
338
188
    _no_warmup_num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
339
188
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_5m",
340
188
            _no_warmup_num_hit_blocks.get(), 300);
341
188
    _no_warmup_num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
342
188
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_5m",
343
188
            _no_warmup_num_read_blocks.get(), 300);
344
188
    _no_warmup_num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
345
188
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_1h",
346
188
            _no_warmup_num_hit_blocks.get(), 3600);
347
188
    _no_warmup_num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
348
188
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_1h",
349
188
            _no_warmup_num_read_blocks.get(), 3600);
350
188
#endif
351
352
188
    _hit_ratio = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
353
188
                                                        "file_cache_hit_ratio", 0.0);
354
188
    _hit_ratio_5m = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
355
188
                                                           "file_cache_hit_ratio_5m", 0.0);
356
188
    _hit_ratio_1h = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
357
188
                                                           "file_cache_hit_ratio_1h", 0.0);
358
359
188
    _no_warmup_hit_ratio = std::make_shared<bvar::Status<double>>(
360
188
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio", 0.0);
361
188
    _no_warmup_hit_ratio_5m = std::make_shared<bvar::Status<double>>(
362
188
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_5m", 0.0);
363
188
    _no_warmup_hit_ratio_1h = std::make_shared<bvar::Status<double>>(
364
188
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_1h", 0.0);
365
366
188
    _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
367
188
            _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
368
188
    _need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>(
369
188
            _cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0);
370
188
    _meta_store_write_queue_size_metrics = std::make_shared<bvar::Status<size_t>>(
371
188
            _cache_base_path.c_str(), "file_cache_meta_store_write_queue_size", 0);
372
373
188
    _cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
374
188
            _cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
375
188
    _get_or_set_latency_us = std::make_shared<bvar::LatencyRecorder>(
376
188
            _cache_base_path.c_str(), "file_cache_get_or_set_latency_us");
377
188
    _storage_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
378
188
            _cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_us");
379
188
    _storage_retry_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
380
188
            _cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us");
381
188
    _storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
382
188
            _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us");
383
188
    _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
384
188
            _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us");
385
188
    _lru_dump_latency_us = std::make_shared<bvar::LatencyRecorder>(
386
188
            _cache_base_path.c_str(), "file_cache_lru_dump_latency_us");
387
188
    _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
388
188
            _cache_base_path.c_str(), "file_cache_recycle_keys_length");
389
188
    _need_update_lru_blocks_length_recorder = std::make_shared<bvar::LatencyRecorder>(
390
188
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length");
391
188
    _need_update_lru_blocks_produce_metrics = std::make_shared<bvar::Adder<size_t>>(
392
188
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_produce");
393
188
    _need_update_lru_blocks_consume_metrics = std::make_shared<bvar::Adder<size_t>>(
394
188
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_consume");
395
188
    _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>(
396
188
            _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us");
397
188
    _ttl_gc_latency_us = std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(),
398
188
                                                                 "file_cache_ttl_gc_latency_us");
399
188
    _shadow_queue_levenshtein_distance = std::make_shared<bvar::LatencyRecorder>(
400
188
            _cache_base_path.c_str(), "file_cache_shadow_queue_levenshtein_distance");
401
188
    for (FileCacheType type : {FileCacheType::DISPOSABLE, FileCacheType::NORMAL,
402
752
                               FileCacheType::INDEX, FileCacheType::TTL}) {
403
752
        size_t idx = file_cache_type_index(type);
404
752
        std::string metric_prefix =
405
752
                "file_cache_lru_recorder_" + cache_type_to_string(type) + "_record_queue";
406
752
        _lru_recorder_queue_length_recorder[idx] = std::make_shared<bvar::LatencyRecorder>(
407
752
                _cache_base_path.c_str(), metric_prefix + "_length");
408
752
        _lru_recorder_queue_produce_metrics[idx] = std::make_shared<bvar::Adder<size_t>>(
409
752
                _cache_base_path.c_str(), metric_prefix + "_produce");
410
752
        _lru_recorder_queue_consume_metrics[idx] = std::make_shared<bvar::Adder<size_t>>(
411
752
                _cache_base_path.c_str(), metric_prefix + "_consume");
412
752
    }
413
188
    _lru_recorder_log_replay_idle_metrics = std::make_shared<bvar::Adder<size_t>>(
414
188
            _cache_base_path.c_str(), "file_cache_lru_recorder_log_replay_idle");
415
416
188
    _disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
417
188
                                 cache_settings.disposable_queue_elements, 60 * 60);
418
188
    _index_queue = LRUQueue(cache_settings.index_queue_size, cache_settings.index_queue_elements,
419
188
                            7 * 24 * 60 * 60);
420
188
    _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements,
421
188
                             24 * 60 * 60);
422
188
    _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements,
423
188
                          std::numeric_limits<int>::max());
424
425
188
    _lru_recorder = std::make_unique<LRUQueueRecorder>(this);
426
188
    _lru_dumper = std::make_unique<CacheLRUDumper>(this, _lru_recorder.get());
427
188
    if (cache_settings.storage == "memory") {
428
24
        _storage = std::make_unique<MemFileCacheStorage>();
429
24
        _cache_base_path = "memory";
430
164
    } else {
431
164
        _storage = std::make_unique<FSFileCacheStorage>();
432
164
    }
433
434
188
    LOG(INFO) << "file cache path= " << _cache_base_path << " " << cache_settings.to_string();
435
188
}
436
437
495k
UInt128Wrapper BlockFileCache::hash(const std::string& path) {
438
495k
    uint128_t value;
439
495k
    sip_hash128(path.data(), path.size(), reinterpret_cast<char*>(&value));
440
495k
    return UInt128Wrapper(value);
441
495k
}
442
443
BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder(
444
28
        const TUniqueId& query_id, int file_cache_query_limit_percent) {
445
28
    SCOPED_CACHE_LOCK(_mutex, this);
446
28
    if (!config::enable_file_cache_query_limit) {
447
1
        return {};
448
1
    }
449
450
    /// if enable_filesystem_query_cache_limit is true,
451
    /// we create context query for current query.
452
27
    auto context = get_or_set_query_context(query_id, cache_lock, file_cache_query_limit_percent);
453
27
    return std::make_unique<QueryFileCacheContextHolder>(query_id, this, context);
454
28
}
455
456
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context(
457
15.0k
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
458
15.0k
    auto query_iter = _query_map.find(query_id);
459
15.0k
    return (query_iter == _query_map.end()) ? nullptr : query_iter->second;
460
15.0k
}
461
462
26
void BlockFileCache::remove_query_context(const TUniqueId& query_id) {
463
26
    SCOPED_CACHE_LOCK(_mutex, this);
464
26
    const auto& query_iter = _query_map.find(query_id);
465
466
26
    if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) {
467
25
        _query_map.erase(query_iter);
468
25
    }
469
26
}
470
471
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_or_set_query_context(
472
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock,
473
27
        int file_cache_query_limit_percent) {
474
27
    if (query_id.lo == 0 && query_id.hi == 0) {
475
1
        return nullptr;
476
1
    }
477
478
26
    auto context = get_query_context(query_id, cache_lock);
479
26
    if (context) {
480
1
        return context;
481
1
    }
482
483
25
    size_t file_cache_query_limit_size = _capacity * file_cache_query_limit_percent / 100;
484
25
    if (file_cache_query_limit_size < 268435456) {
485
25
        LOG(WARNING) << "The user-set file cache query limit (" << file_cache_query_limit_size
486
25
                     << " bytes) is less than the 256MB recommended minimum. "
487
25
                     << "Consider increasing the session variable 'file_cache_query_limit_percent'"
488
25
                     << " from its current value " << file_cache_query_limit_percent << "%.";
489
25
    }
490
25
    auto query_context = std::make_shared<QueryFileCacheContext>(file_cache_query_limit_size);
491
25
    auto query_iter = _query_map.emplace(query_id, query_context).first;
492
25
    return query_iter->second;
493
26
}
494
495
void BlockFileCache::QueryFileCacheContext::remove(const UInt128Wrapper& hash, size_t offset,
496
140
                                                   std::lock_guard<std::mutex>& cache_lock) {
497
140
    auto pair = std::make_pair(hash, offset);
498
140
    auto record = records.find(pair);
499
140
    DCHECK(record != records.end());
500
140
    auto iter = record->second;
501
140
    records.erase(pair);
502
140
    lru_queue.remove(iter, cache_lock);
503
140
}
504
505
void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash, size_t offset,
506
                                                    size_t size,
507
190
                                                    std::lock_guard<std::mutex>& cache_lock) {
508
190
    auto pair = std::make_pair(hash, offset);
509
190
    if (records.find(pair) == records.end()) {
510
189
        auto queue_iter = lru_queue.add(hash, offset, size, cache_lock);
511
189
        records.insert({pair, queue_iter});
512
189
    }
513
190
}
514
515
163
Status BlockFileCache::initialize() {
516
163
    SCOPED_CACHE_LOCK(_mutex, this);
517
163
    return initialize_unlocked(cache_lock);
518
163
}
519
520
163
Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lock) {
521
163
    DCHECK(!_is_initialized);
522
163
    _is_initialized = true;
523
163
    if (config::file_cache_background_lru_dump_tail_record_num > 0) {
524
        // requirements:
525
        // 1. restored data should not overwrite the last dump
526
        // 2. restore should happen before load and async load
527
        // 3. all queues should be restored sequencially to avoid conflict
528
        // TODO(zhengyu): we can parralize them but will increase complexity, so lets check the time cost
529
        // to see if any improvement is a necessary
530
163
        restore_lru_queues_from_disk(cache_lock);
531
163
    }
532
163
    RETURN_IF_ERROR(_storage->init(this));
533
534
163
    if (auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get())) {
535
141
        if (auto* meta_store = fs_storage->get_meta_store()) {
536
141
            _ttl_mgr = std::make_unique<BlockFileCacheTtlMgr>(this, meta_store);
537
141
        }
538
141
    }
539
540
163
    _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);
541
163
    _cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
542
163
    _cache_background_evict_in_advance_thread =
543
163
            std::thread(&BlockFileCache::run_background_evict_in_advance, this);
544
163
    _cache_background_block_lru_update_thread =
545
163
            std::thread(&BlockFileCache::run_background_block_lru_update, this);
546
547
    // Initialize LRU dump thread and restore queues
548
163
    _cache_background_lru_dump_thread = std::thread(&BlockFileCache::run_background_lru_dump, this);
549
163
    _cache_background_lru_log_replay_thread =
550
163
            std::thread(&BlockFileCache::run_background_lru_log_replay, this);
551
552
163
    return Status::OK();
553
163
}
554
555
void BlockFileCache::update_block_lru(FileBlockSPtr block,
556
138k
                                      std::lock_guard<std::mutex>& cache_lock) {
557
138k
    if (!block) {
558
1
        return;
559
1
    }
560
561
138k
    FileBlockCell* cell = get_cell(block->get_hash_value(), block->offset(), cache_lock);
562
138k
    if (!cell || cell->file_block.get() != block.get()) {
563
15.5k
        return;
564
15.5k
    }
565
566
122k
    if (cell->queue_iterator) {
567
122k
        auto& queue = get_queue(block->cache_type());
568
122k
        queue.move_to_end(*cell->queue_iterator, cache_lock);
569
122k
        _lru_recorder->record_queue_event(block->cache_type(), CacheLRULogType::MOVETOBACK,
570
122k
                                          block->_key.hash, block->_key.offset,
571
122k
                                          block->_block_range.size());
572
122k
    }
573
122k
    cell->update_atime();
574
122k
}
575
576
void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, bool move_iter_flag,
577
743k
                              std::lock_guard<std::mutex>& cache_lock) {
578
743k
    if (result) {
579
743k
        result->push_back(cell.file_block);
580
743k
    }
581
582
743k
    auto& queue = get_queue(cell.file_block->cache_type());
583
    /// Move to the end of the queue. The iterator remains valid.
584
743k
    if (!config::enable_file_cache_async_touch_on_get_or_set && cell.queue_iterator &&
585
743k
        move_iter_flag) {
586
549k
        queue.move_to_end(*cell.queue_iterator, cache_lock);
587
549k
        _lru_recorder->record_queue_event(cell.file_block->cache_type(),
588
549k
                                          CacheLRULogType::MOVETOBACK, cell.file_block->_key.hash,
589
549k
                                          cell.file_block->_key.offset, cell.size());
590
549k
    }
591
592
743k
    cell.update_atime();
593
743k
}
594
595
template <class T>
596
    requires IsXLock<T>
597
FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& hash, size_t offset,
598
19.3M
                                        T& /* cache_lock */) {
599
19.3M
    auto it = _files.find(hash);
600
19.3M
    if (it == _files.end()) {
601
2.24k
        return nullptr;
602
2.24k
    }
603
604
19.3M
    auto& offsets = it->second;
605
19.3M
    auto cell_it = offsets.find(offset);
606
19.3M
    if (cell_it == offsets.end()) {
607
237
        return nullptr;
608
237
    }
609
610
19.3M
    return &cell_it->second;
611
19.3M
}
612
613
935k
bool BlockFileCache::need_to_move(FileCacheType cell_type, FileCacheType query_type) const {
614
935k
    return query_type != FileCacheType::DISPOSABLE && cell_type != FileCacheType::DISPOSABLE;
615
935k
}
616
617
FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheContext& context,
618
                                    const FileBlock::Range& range,
619
877k
                                    std::lock_guard<std::mutex>& cache_lock) {
620
    /// Given range = [left, right] and non-overlapping ordered set of file blocks,
621
    /// find list [block1, ..., blockN] of blocks which intersect with given range.
622
877k
    auto it = _files.find(hash);
623
877k
    if (it == _files.end()) {
624
127k
        if (_async_open_done) {
625
127k
            return {};
626
127k
        }
627
1
        FileCacheKey key;
628
1
        key.hash = hash;
629
1
        key.meta.type = context.cache_type;
630
1
        key.meta.expiration_time = context.expiration_time;
631
1
        key.meta.tablet_id = context.tablet_id;
632
1
        _storage->load_blocks_directly_unlocked(this, key, cache_lock);
633
634
1
        it = _files.find(hash);
635
1
        if (it == _files.end()) [[unlikely]] {
636
1
            return {};
637
1
        }
638
1
    }
639
640
749k
    auto& file_blocks = it->second;
641
749k
    if (file_blocks.empty()) {
642
0
        LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string()
643
0
                     << " cache type=" << context.cache_type
644
0
                     << " cache expiration time=" << context.expiration_time
645
0
                     << " cache range=" << range.left << " " << range.right
646
0
                     << " query id=" << context.query_id;
647
0
        DCHECK(false);
648
0
        _files.erase(hash);
649
0
        return {};
650
0
    }
651
652
749k
    FileBlocks result;
653
749k
    auto block_it = file_blocks.lower_bound(range.left);
654
749k
    if (block_it == file_blocks.end()) {
655
        /// N - last cached block for given file hash, block{N}.offset < range.left:
656
        ///   block{N}                       block{N}
657
        /// [________                         [_______]
658
        ///     [__________]         OR                  [________]
659
        ///     ^                                        ^
660
        ///     range.left                               range.left
661
662
6.56k
        const auto& cell = file_blocks.rbegin()->second;
663
6.56k
        if (cell.file_block->range().right < range.left) {
664
6.55k
            return {};
665
6.55k
        }
666
667
14
        use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
668
14
                 cache_lock);
669
743k
    } else { /// block_it <-- segmment{k}
670
743k
        if (block_it != file_blocks.begin()) {
671
1.36k
            const auto& prev_cell = std::prev(block_it)->second;
672
1.36k
            const auto& prev_cell_range = prev_cell.file_block->range();
673
674
1.36k
            if (range.left <= prev_cell_range.right) {
675
                ///   block{k-1}  block{k}
676
                ///   [________]   [_____
677
                ///       [___________
678
                ///       ^
679
                ///       range.left
680
681
137
                use_cell(prev_cell, &result,
682
137
                         need_to_move(prev_cell.file_block->cache_type(), context.cache_type),
683
137
                         cache_lock);
684
137
            }
685
1.36k
        }
686
687
        ///  block{k} ...       block{k-1}  block{k}                      block{k}
688
        ///  [______              [______]     [____                        [________
689
        ///  [_________     OR              [________      OR    [______]   ^
690
        ///  ^                              ^                           ^   block{k}.offset
691
        ///  range.left                     range.left                  range.right
692
693
1.48M
        while (block_it != file_blocks.end()) {
694
745k
            const auto& cell = block_it->second;
695
745k
            if (range.right < cell.file_block->range().left) {
696
2.42k
                break;
697
2.42k
            }
698
699
743k
            use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
700
743k
                     cache_lock);
701
743k
            ++block_it;
702
743k
        }
703
743k
    }
704
705
743k
    return result;
706
749k
}
707
708
4.34M
void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
709
4.34M
    int64_t queue_limit = config::file_cache_background_block_lru_update_queue_max_size;
710
4.34M
    size_t max_queue_size = queue_limit <= 0 ? 0 : static_cast<size_t>(queue_limit);
711
4.34M
    if (_need_update_lru_blocks.insert(std::move(block), max_queue_size)) {
712
140k
        *_need_update_lru_blocks_produce_metrics << 1;
713
140k
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
714
140k
    }
715
4.34M
}
716
717
4
std::string BlockFileCache::clear_file_cache_async() {
718
4
    return clear_file_cache_impl(false);
719
4
}
720
721
28
std::string BlockFileCache::clear_file_cache_sync() {
722
28
    return clear_file_cache_impl(true);
723
28
}
724
725
32
std::string BlockFileCache::clear_file_cache_impl(bool sync_remove) {
726
32
    const char* action = sync_remove ? "clear_file_cache_sync" : "clear_file_cache_async";
727
32
    LOG(INFO) << "start " << action << ", path=" << _cache_base_path;
728
32
    _lru_dumper->remove_lru_dump_files();
729
32
    int64_t num_cells_all = 0;
730
32
    int64_t num_cells_to_delete = 0;
731
32
    int64_t num_cells_wait_recycle = 0;
732
32
    int64_t num_files_all = 0;
733
32
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async");
734
32
    {
735
32
        SCOPED_CACHE_LOCK(_mutex, this);
736
737
32
        std::vector<FileBlockCell*> deleting_cells;
738
5.18k
        for (auto& [_, offset_to_cell] : _files) {
739
5.18k
            ++num_files_all;
740
105k
            for (auto& [_1, cell] : offset_to_cell) {
741
105k
                ++num_cells_all;
742
105k
                deleting_cells.push_back(&cell);
743
105k
            }
744
5.18k
        }
745
746
        // Do not erase while walking _files above: remove() may erase the current map element.
747
        //
748
        // sync_remove only changes how already releasable DOWNLOADED blocks are deleted from
749
        // storage. Busy blocks keep the existing holder lifecycle: mark them deleting and leave
750
        // them in _files until the last holder releases them.
751
105k
        for (auto& cell : deleting_cells) {
752
105k
            if (!cell->releasable()) {
753
103
                LOG(INFO) << "cell is not releasable, hash="
754
103
                          << " offset=" << cell->file_block->offset();
755
103
                cell->file_block->set_deleting();
756
103
                ++num_cells_wait_recycle;
757
103
                continue;
758
103
            }
759
105k
            FileBlockSPtr file_block = cell->file_block;
760
105k
            if (file_block) {
761
105k
                std::lock_guard block_lock(file_block->_mutex);
762
105k
                remove(file_block, cache_lock, block_lock, sync_remove);
763
105k
                ++num_cells_to_delete;
764
105k
            }
765
105k
        }
766
32
        clear_need_update_lru_blocks();
767
32
    }
768
769
32
    std::stringstream ss;
770
32
    ss << "finish " << action << ", path=" << _cache_base_path << " sync_remove=" << sync_remove
771
32
       << " num_files_all=" << num_files_all << " num_cells_all=" << num_cells_all
772
32
       << " num_cells_to_delete=" << num_cells_to_delete
773
32
       << " num_cells_wait_recycle=" << num_cells_wait_recycle;
774
32
    auto msg = ss.str();
775
32
    LOG(INFO) << msg;
776
32
    _lru_dumper->remove_lru_dump_files();
777
32
    return msg;
778
32
}
779
780
FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash,
781
                                                  const CacheContext& context, size_t offset,
782
                                                  size_t size, FileBlock::State state,
783
135k
                                                  std::lock_guard<std::mutex>& cache_lock) {
784
135k
    DCHECK(size > 0);
785
786
135k
    auto current_pos = offset;
787
135k
    auto end_pos_non_included = offset + size;
788
789
135k
    size_t current_size = 0;
790
135k
    size_t remaining_size = size;
791
792
135k
    FileBlocks file_blocks;
793
512k
    while (current_pos < end_pos_non_included) {
794
377k
        current_size = std::min(remaining_size, _max_file_block_size);
795
377k
        remaining_size -= current_size;
796
377k
        state = try_reserve(hash, context, current_pos, current_size, cache_lock)
797
377k
                        ? state
798
377k
                        : FileBlock::State::SKIP_CACHE;
799
377k
        if (state == FileBlock::State::SKIP_CACHE) [[unlikely]] {
800
21.9k
            FileCacheKey key;
801
21.9k
            key.hash = hash;
802
21.9k
            key.offset = current_pos;
803
21.9k
            key.meta.type = context.cache_type;
804
21.9k
            key.meta.expiration_time = context.expiration_time;
805
21.9k
            key.meta.tablet_id = context.tablet_id;
806
21.9k
            auto file_block = std::make_shared<FileBlock>(key, current_size, this,
807
21.9k
                                                          FileBlock::State::SKIP_CACHE);
808
21.9k
            file_blocks.push_back(std::move(file_block));
809
355k
        } else {
810
355k
            auto* cell = add_cell(hash, context, current_pos, current_size, state, cache_lock);
811
355k
            if (cell) {
812
355k
                file_blocks.push_back(cell->file_block);
813
355k
                if (!context.is_cold_data) {
814
355k
                    cell->update_atime();
815
355k
                }
816
355k
            }
817
355k
            if (_ttl_mgr && context.tablet_id != 0) {
818
342k
                _ttl_mgr->register_tablet_id(context.tablet_id);
819
342k
            }
820
355k
        }
821
822
377k
        current_pos += current_size;
823
377k
    }
824
825
135k
    DCHECK(file_blocks.empty() || offset + size - 1 == file_blocks.back()->range().right);
826
135k
    return file_blocks;
827
135k
}
828
829
void BlockFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks,
830
                                                       const UInt128Wrapper& hash,
831
                                                       const CacheContext& context,
832
                                                       const FileBlock::Range& range,
833
741k
                                                       std::lock_guard<std::mutex>& cache_lock) {
834
    /// There are blocks [block1, ..., blockN]
835
    /// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
836
    /// intersect with given range.
837
838
    /// It can have holes:
839
    /// [____________________]         -- requested range
840
    ///     [____]  [_]   [_________]  -- intersecting cache [block1, ..., blockN]
841
    ///
842
    /// For each such hole create a cell with file block state EMPTY.
843
844
741k
    auto it = file_blocks.begin();
845
741k
    auto block_range = (*it)->range();
846
847
741k
    size_t current_pos = 0;
848
741k
    if (block_range.left < range.left) {
849
        ///    [_______     -- requested range
850
        /// [_______
851
        /// ^
852
        /// block1
853
854
151
        current_pos = block_range.right + 1;
855
151
        ++it;
856
741k
    } else {
857
741k
        current_pos = range.left;
858
741k
    }
859
860
1.48M
    while (current_pos <= range.right && it != file_blocks.end()) {
861
743k
        block_range = (*it)->range();
862
863
743k
        if (current_pos == block_range.left) {
864
742k
            current_pos = block_range.right + 1;
865
742k
            ++it;
866
742k
            continue;
867
742k
        }
868
869
743k
        DCHECK(current_pos < block_range.left);
870
871
173
        auto hole_size = block_range.left - current_pos;
872
873
173
        file_blocks.splice(it, split_range_into_cells(hash, context, current_pos, hole_size,
874
173
                                                      FileBlock::State::EMPTY, cache_lock));
875
876
173
        current_pos = block_range.right + 1;
877
173
        ++it;
878
173
    }
879
880
741k
    if (current_pos <= range.right) {
881
        ///   ________]     -- requested range
882
        ///   _____]
883
        ///        ^
884
        /// blockN
885
886
276
        auto hole_size = range.right - current_pos + 1;
887
888
276
        file_blocks.splice(file_blocks.end(),
889
276
                           split_range_into_cells(hash, context, current_pos, hole_size,
890
276
                                                  FileBlock::State::EMPTY, cache_lock));
891
276
    }
892
741k
}
893
894
FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t offset, size_t size,
895
876k
                                            CacheContext& context) {
896
876k
    FileBlock::Range range(offset, offset + size - 1);
897
898
876k
    ReadStatistics* stats = context.stats;
899
876k
    DCHECK(stats != nullptr);
900
876k
    MonotonicStopWatch sw;
901
876k
    sw.start();
902
876k
    FileBlocks file_blocks;
903
876k
    std::vector<FileBlockSPtr> need_update_lru_blocks;
904
876k
    const bool async_touch_on_get_or_set = config::enable_file_cache_async_touch_on_get_or_set;
905
876k
    int64_t duration = 0;
906
876k
    {
907
876k
        ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment();
908
876k
        std::lock_guard cache_lock(_mutex);
909
876k
        ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement();
910
876k
        stats->lock_wait_timer += sw.elapsed_time();
911
876k
        SCOPED_RAW_TIMER(&duration);
912
        /// Get all blocks which intersect with the given range.
913
876k
        {
914
876k
            SCOPED_RAW_TIMER(&stats->get_timer);
915
876k
            file_blocks = get_impl(hash, context, range, cache_lock);
916
876k
        }
917
918
876k
        if (file_blocks.empty()) {
919
135k
            SCOPED_RAW_TIMER(&stats->set_timer);
920
135k
            file_blocks = split_range_into_cells(hash, context, offset, size,
921
135k
                                                 FileBlock::State::EMPTY, cache_lock);
922
741k
        } else {
923
741k
            SCOPED_RAW_TIMER(&stats->set_timer);
924
741k
            fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock);
925
741k
        }
926
876k
        DCHECK(!file_blocks.empty());
927
876k
        *_num_read_blocks << file_blocks.size();
928
876k
        if (!context.is_warmup) {
929
874k
            *_no_warmup_num_read_blocks << file_blocks.size();
930
874k
        }
931
876k
        if (async_touch_on_get_or_set) {
932
193k
            need_update_lru_blocks.reserve(file_blocks.size());
933
193k
        }
934
1.12M
        for (auto& block : file_blocks) {
935
1.12M
            size_t block_size = block->range().size();
936
1.12M
            *_total_read_size_metrics << block_size;
937
1.12M
            if (block->state_unsafe() == FileBlock::State::DOWNLOADED) {
938
736k
                *_num_hit_blocks << 1;
939
736k
                *_total_hit_size_metrics << block_size;
940
736k
                if (!context.is_warmup) {
941
736k
                    *_no_warmup_num_hit_blocks << 1;
942
736k
                }
943
736k
                if (async_touch_on_get_or_set &&
944
736k
                    need_to_move(block->cache_type(), context.cache_type)) {
945
192k
                    need_update_lru_blocks.emplace_back(block);
946
192k
                }
947
736k
            }
948
1.12M
        }
949
876k
    }
950
951
876k
    if (async_touch_on_get_or_set) {
952
193k
        for (auto& block : need_update_lru_blocks) {
953
192k
            add_need_update_lru_block(std::move(block));
954
192k
        }
955
193k
    }
956
957
876k
    *_get_or_set_latency_us << (duration / 1000);
958
876k
    return FileBlocksHolder(std::move(file_blocks));
959
876k
}
960
961
FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheContext& context,
962
                                        size_t offset, size_t size, FileBlock::State state,
963
455k
                                        std::lock_guard<std::mutex>& cache_lock) {
964
    /// Create a file block cell and put it in `files` map by [hash][offset].
965
455k
    if (size == 0) {
966
0
        return nullptr; /// Empty files are not cached.
967
0
    }
968
969
455k
    VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string()
970
0
               << " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type)
971
0
               << " expiration_time=" << context.expiration_time
972
0
               << " tablet_id=" << context.tablet_id;
973
974
455k
    if (size > 1024 * 1024 * 1024) {
975
1
        LOG(WARNING) << "File block size is too large for a block, reject. size=" << size
976
1
                     << " hash=" << hash.to_string() << " offset=" << offset
977
1
                     << " stack:" << get_stack_trace();
978
1
        return nullptr;
979
1
    }
980
981
455k
    auto& offsets = _files[hash];
982
455k
    auto itr = offsets.find(offset);
983
455k
    if (itr != offsets.end()) {
984
5
        VLOG_DEBUG << "Cache already exists for hash: " << hash.to_string()
985
0
                   << ", offset: " << offset << ", size: " << size
986
0
                   << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock);
987
5
        return &(itr->second);
988
5
    }
989
990
455k
    FileCacheKey key;
991
455k
    key.hash = hash;
992
455k
    key.offset = offset;
993
455k
    key.meta.type = context.cache_type;
994
455k
    key.meta.expiration_time = context.expiration_time;
995
455k
    key.meta.tablet_id = context.tablet_id;
996
455k
    FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock);
997
455k
    Status st;
998
455k
    if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) {
999
0
        st = cell.file_block->change_cache_type_lock(FileCacheType::NORMAL, cache_lock);
1000
455k
    } else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) {
1001
1
        st = cell.file_block->change_cache_type_lock(FileCacheType::TTL, cache_lock);
1002
1
    }
1003
455k
    if (!st.ok()) {
1004
0
        LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time
1005
0
                     << " cache_type=" << cache_type_to_string(context.cache_type)
1006
0
                     << " error=" << st.msg();
1007
0
    }
1008
1009
455k
    auto& queue = get_queue(cell.file_block->cache_type());
1010
455k
    cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
1011
455k
    _lru_recorder->record_queue_event(cell.file_block->cache_type(), CacheLRULogType::ADD,
1012
455k
                                      cell.file_block->get_hash_value(), cell.file_block->offset(),
1013
455k
                                      cell.size());
1014
1015
455k
    if (cell.file_block->cache_type() == FileCacheType::TTL) {
1016
3.92k
        _cur_ttl_size += cell.size();
1017
3.92k
    }
1018
455k
    auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell)));
1019
455k
    _cur_cache_size += size;
1020
455k
    return &(it->second);
1021
455k
}
1022
1023
0
size_t BlockFileCache::try_release() {
1024
0
    SCOPED_CACHE_LOCK(_mutex, this);
1025
0
    std::vector<FileBlockCell*> trash;
1026
0
    for (auto& [hash, blocks] : _files) {
1027
0
        for (auto& [offset, cell] : blocks) {
1028
0
            if (cell.releasable()) {
1029
0
                trash.emplace_back(&cell);
1030
0
            } else {
1031
0
                cell.file_block->set_deleting();
1032
0
            }
1033
0
        }
1034
0
    }
1035
0
    size_t remove_size = 0;
1036
0
    for (auto& cell : trash) {
1037
0
        FileBlockSPtr file_block = cell->file_block;
1038
0
        std::lock_guard lc(cell->file_block->_mutex);
1039
0
        remove_size += file_block->range().size();
1040
0
        remove(file_block, cache_lock, lc);
1041
0
        VLOG_DEBUG << "try_release " << _cache_base_path
1042
0
                   << " hash=" << file_block->get_hash_value().to_string()
1043
0
                   << " offset=" << file_block->offset();
1044
0
    }
1045
0
    *_evict_by_try_release << remove_size;
1046
0
    LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << _cache_base_path;
1047
0
    return trash.size();
1048
0
}
1049
1050
2.65M
LRUQueue& BlockFileCache::get_queue(FileCacheType type) {
1051
2.65M
    switch (type) {
1052
485k
    case FileCacheType::INDEX:
1053
485k
        return _index_queue;
1054
386k
    case FileCacheType::DISPOSABLE:
1055
386k
        return _disposable_queue;
1056
1.77M
    case FileCacheType::NORMAL:
1057
1.77M
        return _normal_queue;
1058
10.5k
    case FileCacheType::TTL:
1059
10.5k
        return _ttl_queue;
1060
0
    default:
1061
0
        DCHECK(false);
1062
2.65M
    }
1063
0
    return _normal_queue;
1064
2.65M
}
1065
1066
140
const LRUQueue& BlockFileCache::get_queue(FileCacheType type) const {
1067
140
    switch (type) {
1068
33
    case FileCacheType::INDEX:
1069
33
        return _index_queue;
1070
1
    case FileCacheType::DISPOSABLE:
1071
1
        return _disposable_queue;
1072
106
    case FileCacheType::NORMAL:
1073
106
        return _normal_queue;
1074
0
    case FileCacheType::TTL:
1075
0
        return _ttl_queue;
1076
0
    default:
1077
0
        DCHECK(false);
1078
140
    }
1079
0
    return _normal_queue;
1080
140
}
1081
1082
void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
1083
                                        std::lock_guard<std::mutex>& cache_lock, bool sync,
1084
582k
                                        std::string& reason) {
1085
582k
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1086
109k
        FileBlockSPtr file_block = cell->file_block;
1087
109k
        if (file_block) {
1088
109k
            std::lock_guard block_lock(file_block->_mutex);
1089
109k
            remove(file_block, cache_lock, block_lock, sync);
1090
109k
            VLOG_DEBUG << "remove_file_blocks"
1091
0
                       << " hash=" << file_block->get_hash_value().to_string()
1092
0
                       << " offset=" << file_block->offset() << " reason=" << reason;
1093
109k
        }
1094
109k
    };
1095
582k
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1096
582k
}
1097
1098
void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size,
1099
                                           size_t& removed_size,
1100
                                           std::vector<FileBlockCell*>& to_evict,
1101
                                           std::lock_guard<std::mutex>& cache_lock,
1102
30.1k
                                           size_t& cur_removed_size, bool evict_in_advance) {
1103
18.7M
    for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1104
18.7M
        if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1105
3.73k
            break;
1106
3.73k
        }
1107
18.7M
        auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1108
1109
18.7M
        DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string()
1110
0
                     << ", offset: " << entry_offset;
1111
1112
18.7M
        size_t cell_size = cell->size();
1113
18.7M
        DCHECK(entry_size == cell_size);
1114
1115
18.7M
        if (cell->releasable()) {
1116
108k
            auto& file_block = cell->file_block;
1117
1118
108k
            std::lock_guard block_lock(file_block->_mutex);
1119
108k
            DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1120
108k
            to_evict.push_back(cell);
1121
108k
            removed_size += cell_size;
1122
108k
            cur_removed_size += cell_size;
1123
108k
        }
1124
18.7M
    }
1125
30.1k
}
1126
1127
// 1. if async load file cache not finish
1128
//     a. evict from lru queue
1129
// 2. if ttl cache
1130
//     a. evict from disposable/normal/index queue one by one
1131
// 3. if dont reach query limit or dont have query limit
1132
//     a. evict from other queue
1133
//     b. evict from current queue
1134
//         a.1 if the data belong write, then just evict cold data
1135
// 4. if reach query limit
1136
//     a. evict from query queue
1137
//     b. evict from other queue
1138
bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& context,
1139
                                 size_t offset, size_t size,
1140
377k
                                 std::lock_guard<std::mutex>& cache_lock) {
1141
377k
    if (!_async_open_done) {
1142
4
        return try_reserve_during_async_load(size, cache_lock);
1143
4
    }
1144
    // use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining
1145
    // directly eliminate 5 times the size of the space
1146
377k
    if (_disk_resource_limit_mode) {
1147
93
        size = 5 * size;
1148
93
    }
1149
1150
377k
    auto query_context = config::enable_file_cache_query_limit &&
1151
377k
                                         (context.query_id.hi != 0 || context.query_id.lo != 0)
1152
377k
                                 ? get_query_context(context.query_id, cache_lock)
1153
377k
                                 : nullptr;
1154
377k
    if (!query_context) {
1155
377k
        return try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock);
1156
377k
    } else if (query_context->get_cache_size(cache_lock) + size <=
1157
191
               query_context->get_max_cache_size()) {
1158
60
        return try_reserve_for_lru(hash, query_context, context, offset, size, cache_lock);
1159
60
    }
1160
131
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1161
131
                               std::chrono::steady_clock::now().time_since_epoch())
1162
131
                               .count();
1163
131
    auto& queue = get_queue(context.cache_type);
1164
131
    size_t removed_size = 0;
1165
131
    size_t ghost_remove_size = 0;
1166
131
    size_t queue_size = queue.get_capacity(cache_lock);
1167
131
    size_t cur_cache_size = _cur_cache_size;
1168
131
    size_t query_context_cache_size = query_context->get_cache_size(cache_lock);
1169
1170
131
    std::vector<LRUQueue::Iterator> ghost;
1171
131
    std::vector<FileBlockCell*> to_evict;
1172
1173
131
    size_t max_size = queue.get_max_size();
1174
1.18k
    auto is_overflow = [&] {
1175
1.18k
        return _disk_resource_limit_mode ? removed_size < size
1176
1.18k
                                         : cur_cache_size + size - removed_size > _capacity ||
1177
1.18k
                                                   (queue_size + size - removed_size > max_size) ||
1178
1.18k
                                                   (query_context_cache_size + size -
1179
1.17k
                                                            (removed_size + ghost_remove_size) >
1180
1.17k
                                                    query_context->get_max_cache_size());
1181
1.18k
    };
1182
1183
    /// Select the cache from the LRU queue held by query for expulsion.
1184
1.11k
    for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++) {
1185
1.05k
        if (!is_overflow()) {
1186
75
            break;
1187
75
        }
1188
1189
980
        auto* cell = get_cell(iter->hash, iter->offset, cache_lock);
1190
1191
980
        if (!cell) {
1192
            /// The cache corresponding to this record may be swapped out by
1193
            /// other queries, so it has become invalid.
1194
32
            ghost.push_back(iter);
1195
32
            ghost_remove_size += iter->size;
1196
948
        } else {
1197
948
            size_t cell_size = cell->size();
1198
948
            DCHECK(iter->size == cell_size);
1199
1200
948
            if (cell->releasable()) {
1201
108
                auto& file_block = cell->file_block;
1202
1203
108
                std::lock_guard block_lock(file_block->_mutex);
1204
108
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1205
108
                to_evict.push_back(cell);
1206
108
                removed_size += cell_size;
1207
108
            }
1208
948
        }
1209
980
    }
1210
1211
131
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1212
108
        FileBlockSPtr file_block = cell->file_block;
1213
108
        if (file_block) {
1214
108
            query_context->remove(file_block->get_hash_value(), file_block->offset(), cache_lock);
1215
108
            std::lock_guard block_lock(file_block->_mutex);
1216
108
            remove(file_block, cache_lock, block_lock);
1217
108
        }
1218
108
    };
1219
1220
131
    for (auto& iter : ghost) {
1221
32
        query_context->remove(iter->hash, iter->offset, cache_lock);
1222
32
    }
1223
1224
131
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1225
1226
131
    if (is_overflow() &&
1227
131
        !try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock)) {
1228
1
        return false;
1229
1
    }
1230
130
    query_context->reserve(hash, offset, size, cache_lock);
1231
130
    return true;
1232
131
}
1233
1234
371
void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) {
1235
371
    UInt128Wrapper hash = UInt128Wrapper();
1236
371
    size_t offset = 0;
1237
371
    CacheContext context;
1238
    /* we pick NORMAL and TTL cache to evict in advance
1239
     * we reserve for them but won't acutually give space to them
1240
     * on the contrary, NORMAL and TTL may sacrifice by LRU evicting themselves
1241
     * other cache types cannot be exempted because we will evict what they have stolen before LRU evicting
1242
     * in summary: all cache types will shrink somewhat, and NORMAL and TTL shrink the most, to make sure the cache is not full
1243
     */
1244
371
    context.cache_type = FileCacheType::NORMAL;
1245
371
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1246
371
    context.cache_type = FileCacheType::TTL;
1247
371
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1248
371
}
1249
1250
// remove specific cache synchronously, for critical operations
1251
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1252
8
void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
1253
8
    std::string reason = "remove_if_cached";
1254
8
    SCOPED_CACHE_LOCK(_mutex, this);
1255
8
    auto iter = _files.find(file_key);
1256
8
    std::vector<FileBlockCell*> to_remove;
1257
8
    if (iter != _files.end()) {
1258
14
        for (auto& [_, cell] : iter->second) {
1259
14
            if (cell.releasable()) {
1260
13
                to_remove.push_back(&cell);
1261
13
            } else {
1262
1
                cell.file_block->set_deleting();
1263
1
            }
1264
14
        }
1265
6
    }
1266
8
    remove_file_blocks(to_remove, cache_lock, true, reason);
1267
8
}
1268
1269
// the async version of remove_if_cached, for background operations
1270
// cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously
1271
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1272
174k
void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
1273
174k
    std::string reason = "remove_if_cached_async";
1274
174k
    SCOPED_CACHE_LOCK(_mutex, this);
1275
1276
174k
    auto iter = _files.find(file_key);
1277
174k
    std::vector<FileBlockCell*> to_remove;
1278
174k
    if (iter != _files.end()) {
1279
4.81k
        for (auto& [_, cell] : iter->second) {
1280
4.81k
            *_gc_evict_bytes_metrics << cell.size();
1281
4.81k
            *_gc_evict_count_metrics << 1;
1282
4.81k
            if (cell.releasable()) {
1283
1.01k
                to_remove.push_back(&cell);
1284
3.80k
            } else {
1285
3.80k
                cell.file_block->set_deleting();
1286
3.80k
            }
1287
4.81k
        }
1288
4.43k
    }
1289
174k
    remove_file_blocks(to_remove, cache_lock, false, reason);
1290
174k
}
1291
1292
std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl(
1293
377k
        FileCacheType cur_cache_type) {
1294
377k
    switch (cur_cache_type) {
1295
4.28k
    case FileCacheType::TTL:
1296
4.28k
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1297
45.8k
    case FileCacheType::INDEX:
1298
45.8k
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL};
1299
324k
    case FileCacheType::NORMAL:
1300
324k
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX};
1301
3.53k
    case FileCacheType::DISPOSABLE:
1302
3.53k
        return {FileCacheType::NORMAL, FileCacheType::INDEX};
1303
0
    default:
1304
0
        return {};
1305
377k
    }
1306
0
    return {};
1307
377k
}
1308
1309
26.0k
std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType cur_cache_type) {
1310
26.0k
    switch (cur_cache_type) {
1311
653
    case FileCacheType::TTL:
1312
653
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1313
7.05k
    case FileCacheType::INDEX:
1314
7.05k
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::TTL};
1315
17.9k
    case FileCacheType::NORMAL:
1316
17.9k
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX, FileCacheType::TTL};
1317
334
    case FileCacheType::DISPOSABLE:
1318
334
        return {FileCacheType::NORMAL, FileCacheType::INDEX, FileCacheType::TTL};
1319
0
    default:
1320
0
        return {};
1321
26.0k
    }
1322
0
    return {};
1323
26.0k
}
1324
1325
void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size_t old_size,
1326
57.3k
                                 size_t new_size, std::lock_guard<std::mutex>& cache_lock) {
1327
57.3k
    DCHECK(_files.find(hash) != _files.end() &&
1328
57.3k
           _files.find(hash)->second.find(offset) != _files.find(hash)->second.end());
1329
57.3k
    FileBlockCell* cell = get_cell(hash, offset, cache_lock);
1330
57.3k
    DCHECK(cell != nullptr);
1331
57.3k
    if (cell == nullptr) {
1332
0
        LOG(WARNING) << "reset_range skipped because cache cell is missing. hash="
1333
0
                     << hash.to_string() << " offset=" << offset << " old_size=" << old_size
1334
0
                     << " new_size=" << new_size;
1335
0
        return;
1336
0
    }
1337
57.3k
    DCHECK_EQ(cell->file_block->_block_range.size(), old_size);
1338
57.3k
    if (cell->queue_iterator) {
1339
57.3k
        auto& queue = get_queue(cell->file_block->cache_type());
1340
57.3k
        DCHECK(queue.contains(hash, offset, cache_lock));
1341
57.3k
        queue.resize(*cell->queue_iterator, new_size, cache_lock);
1342
57.3k
        _lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE,
1343
57.3k
                                          cell->file_block->get_hash_value(),
1344
57.3k
                                          cell->file_block->offset(), new_size);
1345
57.3k
    }
1346
57.3k
    cell->file_block->_block_range.right = cell->file_block->_block_range.left + new_size - 1;
1347
57.3k
    _cur_cache_size -= old_size;
1348
57.3k
    _cur_cache_size += new_size;
1349
57.3k
    if (cell->file_block->cache_type() == FileCacheType::TTL) {
1350
0
        _cur_ttl_size -= old_size;
1351
0
        _cur_ttl_size += new_size;
1352
0
    }
1353
57.3k
}
1354
1355
bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
1356
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1357
377k
        int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1358
377k
    size_t removed_size = 0;
1359
377k
    size_t cur_cache_size = _cur_cache_size;
1360
377k
    std::vector<FileBlockCell*> to_evict;
1361
760k
    for (FileCacheType cache_type : other_cache_types) {
1362
760k
        auto& queue = get_queue(cache_type);
1363
760k
        size_t remove_size_per_type = 0;
1364
760k
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1365
704k
            if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1366
663k
                break;
1367
663k
            }
1368
41.0k
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1369
41.0k
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
1370
0
                         << ", offset: " << entry_offset;
1371
1372
41.0k
            size_t cell_size = cell->size();
1373
41.0k
            DCHECK(entry_size == cell_size);
1374
1375
41.0k
            if (cell->atime == 0 ? true : cell->atime + queue.get_hot_data_interval() > cur_time) {
1376
41.0k
                break;
1377
41.0k
            }
1378
1379
2
            if (cell->releasable()) {
1380
2
                auto& file_block = cell->file_block;
1381
2
                std::lock_guard block_lock(file_block->_mutex);
1382
2
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1383
2
                to_evict.push_back(cell);
1384
2
                removed_size += cell_size;
1385
2
                remove_size_per_type += cell_size;
1386
2
            }
1387
2
        }
1388
760k
        *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type;
1389
760k
    }
1390
377k
    bool is_sync_removal = !evict_in_advance;
1391
377k
    std::string reason = std::string("try_reserve_by_time ") +
1392
377k
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1393
377k
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1394
1395
377k
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1396
377k
}
1397
1398
bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size,
1399
19.8M
                                 bool evict_in_advance) const {
1400
19.8M
    bool ret = false;
1401
19.8M
    if (evict_in_advance) { // we don't need to check _need_evict_cache_in_advance
1402
471k
        ret = (removed_size < need_size);
1403
471k
        return ret;
1404
471k
    }
1405
19.3M
    if (_disk_resource_limit_mode) {
1406
348
        ret = (removed_size < need_size);
1407
19.3M
    } else {
1408
19.3M
        ret = (cur_cache_size + need_size - removed_size > _capacity);
1409
19.3M
    }
1410
19.3M
    return ret;
1411
19.8M
}
1412
1413
bool BlockFileCache::try_reserve_from_other_queue_by_size(
1414
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1415
4.81k
        std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1416
4.81k
    size_t removed_size = 0;
1417
4.81k
    size_t cur_cache_size = _cur_cache_size;
1418
4.81k
    std::vector<FileBlockCell*> to_evict;
1419
    // we follow the privilege defined in get_other_cache_types to evict
1420
14.4k
    for (FileCacheType cache_type : other_cache_types) {
1421
14.4k
        auto& queue = get_queue(cache_type);
1422
1423
        // we will not drain each of them to the bottom -- i.e., we only
1424
        // evict what they have stolen.
1425
14.4k
        size_t cur_queue_size = queue.get_capacity(cache_lock);
1426
14.4k
        size_t cur_queue_max_size = queue.get_max_size();
1427
14.4k
        if (cur_queue_size <= cur_queue_max_size) {
1428
9.38k
            continue;
1429
9.38k
        }
1430
5.06k
        size_t cur_removed_size = 0;
1431
5.06k
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1432
5.06k
                              cur_removed_size, evict_in_advance);
1433
5.06k
        *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size;
1434
5.06k
    }
1435
4.81k
    bool is_sync_removal = !evict_in_advance;
1436
4.81k
    std::string reason = std::string("try_reserve_by_size") +
1437
4.81k
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1438
4.81k
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1439
4.81k
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1440
4.81k
}
1441
1442
bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size,
1443
                                                  int64_t cur_time,
1444
                                                  std::lock_guard<std::mutex>& cache_lock,
1445
377k
                                                  bool evict_in_advance) {
1446
    // currently, TTL cache is not considered as a candidate
1447
377k
    auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
1448
377k
    bool reserve_success = try_reserve_from_other_queue_by_time_interval(
1449
377k
            cur_cache_type, other_cache_types, size, cur_time, cache_lock, evict_in_advance);
1450
377k
    if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) {
1451
351k
        return reserve_success;
1452
351k
    }
1453
1454
26.0k
    other_cache_types = get_other_cache_type(cur_cache_type);
1455
26.0k
    auto& cur_queue = get_queue(cur_cache_type);
1456
26.0k
    size_t cur_queue_size = cur_queue.get_capacity(cache_lock);
1457
26.0k
    size_t cur_queue_max_size = cur_queue.get_max_size();
1458
    // Hit the soft limit by self, cannot remove from other queues
1459
26.0k
    if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) {
1460
21.2k
        return false;
1461
21.2k
    }
1462
4.81k
    return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock,
1463
4.81k
                                                evict_in_advance);
1464
26.0k
}
1465
1466
bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
1467
                                         QueryFileCacheContextPtr query_context,
1468
                                         const CacheContext& context, size_t offset, size_t size,
1469
                                         std::lock_guard<std::mutex>& cache_lock,
1470
377k
                                         bool evict_in_advance) {
1471
377k
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1472
377k
                               std::chrono::steady_clock::now().time_since_epoch())
1473
377k
                               .count();
1474
377k
    if (!try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock,
1475
377k
                                      evict_in_advance)) {
1476
25.0k
        auto& queue = get_queue(context.cache_type);
1477
25.0k
        size_t removed_size = 0;
1478
25.0k
        size_t cur_cache_size = _cur_cache_size;
1479
1480
25.0k
        std::vector<FileBlockCell*> to_evict;
1481
25.0k
        size_t cur_removed_size = 0;
1482
25.0k
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1483
25.0k
                              cur_removed_size, evict_in_advance);
1484
25.0k
        bool is_sync_removal = !evict_in_advance;
1485
25.0k
        std::string reason = std::string("try_reserve for cache type ") +
1486
25.0k
                             cache_type_to_string(context.cache_type) +
1487
25.0k
                             " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1488
25.0k
        remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1489
25.0k
        *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size;
1490
1491
25.0k
        if (is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1492
22.5k
            return false;
1493
22.5k
        }
1494
25.0k
    }
1495
1496
355k
    if (query_context) {
1497
60
        query_context->reserve(hash, offset, size, cache_lock);
1498
60
    }
1499
355k
    return true;
1500
377k
}
1501
1502
template <class T, class U>
1503
    requires IsXLock<T> && IsXLock<U>
1504
448k
void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) {
1505
448k
    auto hash = file_block->get_hash_value();
1506
448k
    auto offset = file_block->offset();
1507
448k
    auto type = file_block->cache_type();
1508
448k
    auto expiration_time = file_block->expiration_time();
1509
448k
    auto tablet_id = file_block->tablet_id();
1510
448k
    auto* cell = get_cell(hash, offset, cache_lock);
1511
448k
    file_block->cell = nullptr;
1512
    // Holder cleanup can race with prior cache metadata cleanup. In that case,
1513
    // skip the duplicate remove instead of touching a detached or replaced cell.
1514
448k
    if (cell == nullptr) {
1515
1
        LOG(WARNING) << "remove skipped because cache cell is missing. hash=" << hash.to_string()
1516
1
                     << " offset=" << offset << " size=" << file_block->range().size()
1517
1
                     << " type=" << cache_type_to_string(type)
1518
1
                     << " state=" << FileBlock::state_to_string(file_block->state_unsafe())
1519
1
                     << " expiration_time=" << expiration_time << " sync=" << sync;
1520
1
        return;
1521
1
    }
1522
448k
    if (cell->file_block.get() != file_block.get()) {
1523
1
        auto* cell_file_block = cell->file_block.get();
1524
1
        LOG(WARNING)
1525
1
                << "remove skipped because cache cell points to a different file block. hash="
1526
1
                << hash.to_string() << " offset=" << offset
1527
1
                << " size=" << file_block->range().size() << " type=" << cache_type_to_string(type)
1528
1
                << " state=" << FileBlock::state_to_string(file_block->state_unsafe())
1529
1
                << " expiration_time=" << expiration_time << " sync=" << sync << " cell_block_hash="
1530
1
                << (cell_file_block ? cell_file_block->get_hash_value().to_string() : "<null>")
1531
1
                << " cell_block_offset="
1532
1
                << (cell_file_block ? std::to_string(cell_file_block->offset()) : "<null>")
1533
1
                << " cell_block_size="
1534
1
                << (cell_file_block ? std::to_string(cell_file_block->range().size()) : "<null>")
1535
1
                << " cell_block_type="
1536
1
                << (cell_file_block ? cache_type_to_string(cell_file_block->cache_type())
1537
1
                                    : "<null>")
1538
1
                << " cell_block_state="
1539
1
                << (cell_file_block ? FileBlock::state_to_string(cell_file_block->state_unsafe())
1540
1
                                    : "<null>");
1541
1
        return;
1542
1
    }
1543
448k
    DCHECK(cell->queue_iterator);
1544
448k
    if (cell->queue_iterator) {
1545
448k
        auto& queue = get_queue(file_block->cache_type());
1546
448k
        queue.remove(*cell->queue_iterator, cache_lock);
1547
448k
        _lru_recorder->record_queue_event(file_block->cache_type(), CacheLRULogType::REMOVE,
1548
448k
                                          cell->file_block->get_hash_value(),
1549
448k
                                          cell->file_block->offset(), cell->size());
1550
448k
    }
1551
448k
    *_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())]
1552
448k
            << file_block->range().size();
1553
448k
    *_total_evict_size_metrics << file_block->range().size();
1554
1555
448k
    VLOG_DEBUG << "Removing file block from cache. hash: " << hash.to_string()
1556
0
               << ", offset: " << offset << ", size: " << file_block->range().size()
1557
0
               << ", type: " << cache_type_to_string(type);
1558
1559
448k
    if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) {
1560
215k
        FileCacheKey key;
1561
215k
        key.hash = hash;
1562
215k
        key.offset = offset;
1563
215k
        key.meta.type = type;
1564
215k
        key.meta.expiration_time = expiration_time;
1565
215k
        key.meta.tablet_id = tablet_id;
1566
215k
        if (sync) {
1567
150k
            int64_t duration_ns = 0;
1568
150k
            Status st;
1569
150k
            {
1570
150k
                SCOPED_RAW_TIMER(&duration_ns);
1571
150k
                st = _storage->remove(key);
1572
150k
            }
1573
150k
            *_storage_sync_remove_latency_us << (duration_ns / 1000);
1574
150k
            if (!st.ok()) {
1575
0
                LOG_WARNING("").error(st);
1576
0
            }
1577
150k
        } else {
1578
            // the file will be deleted in the bottom half
1579
            // so there will be a window that the file is not in the cache but still in the storage
1580
            // but it's ok, because the rowset is stale already
1581
65.4k
            bool ret = _recycle_keys.enqueue(key);
1582
65.4k
            if (ret) [[likely]] {
1583
65.4k
                *_recycle_keys_length_recorder << _recycle_keys.size_approx();
1584
65.4k
            } else {
1585
0
                LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
1586
0
                int64_t duration_ns = 0;
1587
0
                Status st;
1588
0
                {
1589
0
                    SCOPED_RAW_TIMER(&duration_ns);
1590
0
                    st = _storage->remove(key);
1591
0
                }
1592
0
                *_storage_retry_sync_remove_latency_us << (duration_ns / 1000);
1593
0
                if (!st.ok()) {
1594
0
                    LOG_WARNING("").error(st);
1595
0
                }
1596
0
            }
1597
65.4k
        }
1598
232k
    } else if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) {
1599
100
        file_block->set_deleting();
1600
100
        return;
1601
100
    }
1602
448k
    _cur_cache_size -= file_block->range().size();
1603
448k
    if (FileCacheType::TTL == type) {
1604
1.29k
        _cur_ttl_size -= file_block->range().size();
1605
1.29k
    }
1606
448k
    auto it = _files.find(hash);
1607
448k
    if (it != _files.end()) {
1608
448k
        it->second.erase(file_block->offset());
1609
448k
        if (it->second.empty()) {
1610
113k
            _files.erase(hash);
1611
113k
        }
1612
448k
    }
1613
448k
    *_num_removed_blocks << 1;
1614
448k
}
1615
1616
46
size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
1617
46
    SCOPED_CACHE_LOCK(_mutex, this);
1618
46
    return get_used_cache_size_unlocked(cache_type, cache_lock);
1619
46
}
1620
1621
size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type,
1622
46
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1623
46
    return get_queue(cache_type).get_capacity(cache_lock);
1624
46
}
1625
1626
0
size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const {
1627
0
    SCOPED_CACHE_LOCK(_mutex, this);
1628
0
    return get_available_cache_size_unlocked(cache_type, cache_lock);
1629
0
}
1630
1631
size_t BlockFileCache::get_available_cache_size_unlocked(
1632
0
        FileCacheType cache_type, std::lock_guard<std::mutex>& cache_lock) const {
1633
0
    return get_queue(cache_type).get_max_element_size() -
1634
0
           get_used_cache_size_unlocked(cache_type, cache_lock);
1635
0
}
1636
1637
91
size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const {
1638
91
    SCOPED_CACHE_LOCK(_mutex, this);
1639
91
    return get_file_blocks_num_unlocked(cache_type, cache_lock);
1640
91
}
1641
1642
size_t BlockFileCache::get_file_blocks_num_unlocked(FileCacheType cache_type,
1643
91
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1644
91
    return get_queue(cache_type).get_elements_num(cache_lock);
1645
91
}
1646
1647
FileBlockCell::FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock)
1648
455k
        : file_block(file_block) {
1649
455k
    file_block->cell = this;
1650
    /**
1651
     * Cell can be created with either DOWNLOADED or EMPTY file block's state.
1652
     * File block acquires DOWNLOADING state and creates LRUQueue iterator on first
1653
     * successful getOrSetDownaloder call.
1654
     */
1655
1656
455k
    switch (file_block->_download_state) {
1657
100k
    case FileBlock::State::DOWNLOADED:
1658
455k
    case FileBlock::State::EMPTY:
1659
455k
    case FileBlock::State::SKIP_CACHE: {
1660
455k
        break;
1661
455k
    }
1662
0
    default:
1663
0
        DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE state, got: "
1664
0
                      << FileBlock::state_to_string(file_block->_download_state);
1665
455k
    }
1666
455k
    if (file_block->cache_type() == FileCacheType::TTL) {
1667
3.92k
        update_atime();
1668
3.92k
    }
1669
455k
}
1670
1671
LRUQueue::Iterator LRUQueue::add(const UInt128Wrapper& hash, size_t offset, size_t size,
1672
915k
                                 std::lock_guard<std::mutex>& /* cache_lock */) {
1673
915k
    cache_size += size;
1674
915k
    auto iter = queue.insert(queue.end(), FileKeyAndOffset(hash, offset, size));
1675
915k
    map.insert(std::make_pair(std::make_pair(hash, offset), iter));
1676
915k
    return iter;
1677
915k
}
1678
1679
0
void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
1680
0
    queue.clear();
1681
0
    map.clear();
1682
0
    cache_size = 0;
1683
0
}
1684
1685
1.33M
void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* cache_lock */) {
1686
1.33M
    queue.splice(queue.end(), queue, queue_it);
1687
1.33M
}
1688
1689
void LRUQueue::resize(Iterator queue_it, size_t new_size,
1690
114k
                      std::lock_guard<std::mutex>& /* cache_lock */) {
1691
114k
    cache_size -= queue_it->size;
1692
114k
    queue_it->size = new_size;
1693
114k
    cache_size += new_size;
1694
114k
}
1695
bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
1696
57.3k
                        std::lock_guard<std::mutex>& /* cache_lock */) const {
1697
57.3k
    return map.find(std::make_pair(hash, offset)) != map.end();
1698
57.3k
}
1699
1700
LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset,
1701
1.18M
                                 std::lock_guard<std::mutex>& /* cache_lock */) const {
1702
1.18M
    auto itr = map.find(std::make_pair(hash, offset));
1703
1.18M
    if (itr != map.end()) {
1704
1.17M
        return itr->second;
1705
1.17M
    }
1706
2.53k
    return std::list<FileKeyAndOffset>::iterator();
1707
1.18M
}
1708
1709
2
std::string LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */) const {
1710
2
    std::string result;
1711
18
    for (const auto& [hash, offset, size] : queue) {
1712
18
        if (!result.empty()) {
1713
16
            result += ", ";
1714
16
        }
1715
18
        result += fmt::format("{}: [{}, {}]", hash.to_string(), offset, offset + size - 1);
1716
18
    }
1717
2
    return result;
1718
2
}
1719
1720
size_t LRUQueue::levenshtein_distance_from(LRUQueue& base,
1721
5
                                           std::lock_guard<std::mutex>& cache_lock) {
1722
5
    std::list<FileKeyAndOffset> target_queue = this->queue;
1723
5
    std::list<FileKeyAndOffset> base_queue = base.queue;
1724
5
    std::vector<FileKeyAndOffset> vec1(target_queue.begin(), target_queue.end());
1725
5
    std::vector<FileKeyAndOffset> vec2(base_queue.begin(), base_queue.end());
1726
1727
5
    size_t m = vec1.size();
1728
5
    size_t n = vec2.size();
1729
1730
    // Create a 2D vector (matrix) to store the Levenshtein distances
1731
    // dp[i][j] will hold the distance between the first i elements of vec1 and the first j elements of vec2
1732
5
    std::vector<std::vector<size_t>> dp(m + 1, std::vector<size_t>(n + 1, 0));
1733
1734
    // Initialize the first row and column of the matrix
1735
    // The distance between an empty list and a list of length k is k (all insertions or deletions)
1736
22
    for (size_t i = 0; i <= m; ++i) {
1737
17
        dp[i][0] = i;
1738
17
    }
1739
20
    for (size_t j = 0; j <= n; ++j) {
1740
15
        dp[0][j] = j;
1741
15
    }
1742
1743
    // Fill the matrix using dynamic programming
1744
17
    for (size_t i = 1; i <= m; ++i) {
1745
38
        for (size_t j = 1; j <= n; ++j) {
1746
            // Check if the current elements of both vectors are equal
1747
26
            size_t cost = (vec1[i - 1].hash == vec2[j - 1].hash &&
1748
26
                           vec1[i - 1].offset == vec2[j - 1].offset)
1749
26
                                  ? 0
1750
26
                                  : 1;
1751
            // Calculate the minimum cost of three possible operations:
1752
            // 1. Insertion: dp[i][j-1] + 1
1753
            // 2. Deletion: dp[i-1][j] + 1
1754
            // 3. Substitution: dp[i-1][j-1] + cost (0 if elements are equal, 1 if not)
1755
26
            dp[i][j] = std::min({dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i - 1][j - 1] + cost});
1756
26
        }
1757
12
    }
1758
    // The bottom-right cell of the matrix contains the Levenshtein distance
1759
5
    return dp[m][n];
1760
5
}
1761
1762
1
std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) {
1763
1
    SCOPED_CACHE_LOCK(_mutex, this);
1764
1
    return dump_structure_unlocked(hash, cache_lock);
1765
1
}
1766
1767
std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
1768
1
                                                    std::lock_guard<std::mutex>&) {
1769
1
    std::stringstream result;
1770
1
    auto it = _files.find(hash);
1771
1
    if (it == _files.end()) {
1772
0
        return std::string("");
1773
0
    }
1774
1
    const auto& cells_by_offset = it->second;
1775
1776
1
    for (const auto& [_, cell] : cells_by_offset) {
1777
1
        result << cell.file_block->get_info_for_log() << " "
1778
1
               << cache_type_to_string(cell.file_block->cache_type()) << "\n";
1779
1
    }
1780
1781
1
    return result.str();
1782
1
}
1783
1784
0
std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) {
1785
0
    SCOPED_CACHE_LOCK(_mutex, this);
1786
0
    return dump_single_cache_type_unlocked(hash, offset, cache_lock);
1787
0
}
1788
1789
std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash,
1790
                                                            size_t offset,
1791
0
                                                            std::lock_guard<std::mutex>&) {
1792
0
    std::stringstream result;
1793
0
    auto it = _files.find(hash);
1794
0
    if (it == _files.end()) {
1795
0
        return std::string("");
1796
0
    }
1797
0
    const auto& cells_by_offset = it->second;
1798
0
    const auto& cell = cells_by_offset.find(offset);
1799
1800
0
    return cache_type_to_string(cell->second.file_block->cache_type());
1801
0
}
1802
1803
void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset,
1804
                                       FileCacheType new_type,
1805
2.64k
                                       std::lock_guard<std::mutex>& cache_lock) {
1806
2.64k
    if (auto iter = _files.find(hash); iter != _files.end()) {
1807
2.64k
        auto& file_blocks = iter->second;
1808
2.64k
        if (auto cell_it = file_blocks.find(offset); cell_it != file_blocks.end()) {
1809
2.64k
            FileBlockCell& cell = cell_it->second;
1810
2.64k
            auto& cur_queue = get_queue(cell.file_block->cache_type());
1811
2.64k
            DCHECK(cell.queue_iterator.has_value());
1812
2.64k
            cur_queue.remove(*cell.queue_iterator, cache_lock);
1813
2.64k
            _lru_recorder->record_queue_event(
1814
2.64k
                    cell.file_block->cache_type(), CacheLRULogType::REMOVE,
1815
2.64k
                    cell.file_block->get_hash_value(), cell.file_block->offset(), cell.size());
1816
2.64k
            auto& new_queue = get_queue(new_type);
1817
2.64k
            cell.queue_iterator =
1818
2.64k
                    new_queue.add(hash, offset, cell.file_block->range().size(), cache_lock);
1819
2.64k
            _lru_recorder->record_queue_event(new_type, CacheLRULogType::ADD,
1820
2.64k
                                              cell.file_block->get_hash_value(),
1821
2.64k
                                              cell.file_block->offset(), cell.size());
1822
2.64k
        }
1823
2.64k
    }
1824
2.64k
}
1825
1826
// @brief: get a path's disk capacity used percent, inode used percent
1827
// @param: path
1828
// @param: percent.first disk used percent, percent.second inode used percent
1829
9.53k
int disk_used_percentage(const std::string& path, std::pair<int, int>* percent) {
1830
9.53k
    struct statfs stat;
1831
9.53k
    int ret = statfs(path.c_str(), &stat);
1832
9.53k
    if (ret != 0) {
1833
2
        return ret;
1834
2
    }
1835
    // https://github.com/coreutils/coreutils/blob/master/src/df.c#L1195
1836
    // v->used = stat.f_blocks - stat.f_bfree
1837
    // nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail
1838
9.53k
    uintmax_t u100 = (stat.f_blocks - stat.f_bfree) * 100;
1839
9.53k
    uintmax_t nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail;
1840
9.53k
    int capacity_percentage = int(u100 / nonroot_total + (u100 % nonroot_total != 0));
1841
1842
9.53k
    unsigned long long inode_free = stat.f_ffree;
1843
9.53k
    unsigned long long inode_total = stat.f_files;
1844
9.53k
    int inode_percentage = cast_set<int>(inode_free * 100 / inode_total);
1845
9.53k
    percent->first = capacity_percentage;
1846
9.53k
    percent->second = 100 - inode_percentage;
1847
1848
    // Add sync point for testing
1849
9.53k
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent);
1850
1851
9.53k
    return 0;
1852
9.53k
}
1853
1854
10
std::string BlockFileCache::reset_capacity(size_t new_capacity) {
1855
10
    using namespace std::chrono;
1856
10
    int64_t space_released = 0;
1857
10
    size_t old_capacity = 0;
1858
10
    std::stringstream ss;
1859
10
    ss << "finish reset_capacity, path=" << _cache_base_path;
1860
10
    auto adjust_start_time = steady_clock::time_point();
1861
10
    {
1862
10
        SCOPED_CACHE_LOCK(_mutex, this);
1863
10
        if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
1864
1
            int64_t need_remove_size = _cur_cache_size - new_capacity;
1865
4
            auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
1866
4
                int64_t queue_released = 0;
1867
4
                std::vector<FileBlockCell*> to_evict;
1868
13
                for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1869
13
                    if (need_remove_size <= 0) {
1870
1
                        break;
1871
1
                    }
1872
12
                    need_remove_size -= entry_size;
1873
12
                    space_released += entry_size;
1874
12
                    queue_released += entry_size;
1875
12
                    auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1876
12
                    if (!cell->releasable()) {
1877
0
                        cell->file_block->set_deleting();
1878
0
                        continue;
1879
0
                    }
1880
12
                    to_evict.push_back(cell);
1881
12
                }
1882
12
                for (auto& cell : to_evict) {
1883
12
                    FileBlockSPtr file_block = cell->file_block;
1884
12
                    std::lock_guard block_lock(file_block->_mutex);
1885
12
                    remove(file_block, cache_lock, block_lock);
1886
12
                }
1887
4
                return queue_released;
1888
4
            };
1889
1
            int64_t queue_released = remove_blocks(_disposable_queue);
1890
1
            ss << " disposable_queue released " << queue_released;
1891
1
            queue_released = remove_blocks(_normal_queue);
1892
1
            ss << " normal_queue released " << queue_released;
1893
1
            queue_released = remove_blocks(_index_queue);
1894
1
            ss << " index_queue released " << queue_released;
1895
1
            queue_released = remove_blocks(_ttl_queue);
1896
1
            ss << " ttl_queue released " << queue_released;
1897
1898
1
            _disk_resource_limit_mode = true;
1899
1
            _disk_limit_mode_metrics->set_value(1);
1900
1
            ss << " total_space_released=" << space_released;
1901
1
        }
1902
10
        old_capacity = _capacity;
1903
10
        _capacity = new_capacity;
1904
10
        _cache_capacity_metrics->set_value(_capacity);
1905
10
    }
1906
10
    auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - adjust_start_time);
1907
10
    LOG(INFO) << "Finish tag deleted block. path=" << _cache_base_path
1908
10
              << " use_time=" << cast_set<int64_t>(use_time.count());
1909
10
    ss << " old_capacity=" << old_capacity << " new_capacity=" << new_capacity;
1910
10
    LOG(INFO) << ss.str();
1911
10
    return ss.str();
1912
10
}
1913
1914
4.85k
void BlockFileCache::check_disk_resource_limit() {
1915
4.85k
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1916
23
        return;
1917
23
    }
1918
1919
4.83k
    bool previous_mode = _disk_resource_limit_mode;
1920
4.83k
    if (_capacity > _cur_cache_size) {
1921
4.83k
        _disk_resource_limit_mode = false;
1922
4.83k
        _disk_limit_mode_metrics->set_value(0);
1923
4.83k
    }
1924
4.83k
    std::pair<int, int> percent;
1925
4.83k
    int ret = disk_used_percentage(_cache_base_path, &percent);
1926
4.83k
    if (ret != 0) {
1927
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1928
1
        return;
1929
1
    }
1930
4.83k
    auto [space_percentage, inode_percentage] = percent;
1931
9.66k
    auto is_insufficient = [](const int& percentage) {
1932
9.66k
        return percentage >= config::file_cache_enter_disk_resource_limit_mode_percent;
1933
9.66k
    };
1934
4.83k
    DCHECK_GE(space_percentage, 0);
1935
4.83k
    DCHECK_LE(space_percentage, 100);
1936
4.83k
    DCHECK_GE(inode_percentage, 0);
1937
4.83k
    DCHECK_LE(inode_percentage, 100);
1938
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1939
    // FIXME: reject with config validator
1940
4.83k
    if (config::file_cache_enter_disk_resource_limit_mode_percent <
1941
4.83k
        config::file_cache_exit_disk_resource_limit_mode_percent) {
1942
1
        LOG_WARNING("config error, set to default value")
1943
1
                .tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent)
1944
1
                .tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent);
1945
1
        config::file_cache_enter_disk_resource_limit_mode_percent = 88;
1946
1
        config::file_cache_exit_disk_resource_limit_mode_percent = 80;
1947
1
    }
1948
4.83k
    bool is_space_insufficient = is_insufficient(space_percentage);
1949
4.83k
    bool is_inode_insufficient = is_insufficient(inode_percentage);
1950
4.83k
    if (is_space_insufficient || is_inode_insufficient) {
1951
5
        _disk_resource_limit_mode = true;
1952
5
        _disk_limit_mode_metrics->set_value(1);
1953
4.82k
    } else if (_disk_resource_limit_mode &&
1954
4.82k
               (space_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) &&
1955
4.82k
               (inode_percentage < config::file_cache_exit_disk_resource_limit_mode_percent)) {
1956
0
        _disk_resource_limit_mode = false;
1957
0
        _disk_limit_mode_metrics->set_value(0);
1958
0
    }
1959
4.83k
    if (previous_mode != _disk_resource_limit_mode) {
1960
        // add log for disk resource limit mode switching
1961
10
        if (_disk_resource_limit_mode) {
1962
5
            LOG(WARNING) << "Entering disk resource limit mode: file_cache=" << get_base_path()
1963
5
                         << " space_percent=" << space_percentage
1964
5
                         << " inode_percent=" << inode_percentage
1965
5
                         << " is_space_insufficient=" << is_space_insufficient
1966
5
                         << " is_inode_insufficient=" << is_inode_insufficient
1967
5
                         << " enter threshold="
1968
5
                         << config::file_cache_enter_disk_resource_limit_mode_percent;
1969
5
        } else {
1970
5
            LOG(INFO) << "Exiting disk resource limit mode: file_cache=" << get_base_path()
1971
5
                      << " space_percent=" << space_percentage
1972
5
                      << " inode_percent=" << inode_percentage << " exit threshold="
1973
5
                      << config::file_cache_exit_disk_resource_limit_mode_percent;
1974
5
        }
1975
4.82k
    } else if (_disk_resource_limit_mode) {
1976
        // print log for disk resource limit mode running, but less frequently
1977
0
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
1978
0
                                 << " space_percent=" << space_percentage
1979
0
                                 << " inode_percent=" << inode_percentage
1980
0
                                 << " is_space_insufficient=" << is_space_insufficient
1981
0
                                 << " is_inode_insufficient=" << is_inode_insufficient
1982
0
                                 << " mode run in resource limit";
1983
0
    }
1984
4.83k
}
1985
1986
4.70k
void BlockFileCache::check_need_evict_cache_in_advance() {
1987
4.70k
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1988
1
        return;
1989
1
    }
1990
1991
4.70k
    std::pair<int, int> percent;
1992
4.70k
    int ret = disk_used_percentage(_cache_base_path, &percent);
1993
4.70k
    if (ret != 0) {
1994
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1995
1
        return;
1996
1
    }
1997
4.70k
    auto [space_percentage, inode_percentage] = percent;
1998
4.70k
    int size_percentage = static_cast<int>(_cur_cache_size * 100 / _capacity);
1999
14.1k
    auto is_insufficient = [](const int& percentage) {
2000
14.1k
        return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent;
2001
14.1k
    };
2002
4.70k
    DCHECK_GE(space_percentage, 0);
2003
4.70k
    DCHECK_LE(space_percentage, 100);
2004
4.70k
    DCHECK_GE(inode_percentage, 0);
2005
4.70k
    DCHECK_LE(inode_percentage, 100);
2006
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
2007
    // FIXME: reject with config validator
2008
4.70k
    if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
2009
4.70k
        config::file_cache_exit_need_evict_cache_in_advance_percent) {
2010
1
        LOG_WARNING("config error, set to default value")
2011
1
                .tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent)
2012
1
                .tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent);
2013
1
        config::file_cache_enter_need_evict_cache_in_advance_percent = 78;
2014
1
        config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
2015
1
    }
2016
4.70k
    bool previous_mode = _need_evict_cache_in_advance;
2017
4.70k
    bool is_space_insufficient = is_insufficient(space_percentage);
2018
4.70k
    bool is_inode_insufficient = is_insufficient(inode_percentage);
2019
4.70k
    bool is_size_insufficient = is_insufficient(size_percentage);
2020
4.70k
    if (is_space_insufficient || is_inode_insufficient || is_size_insufficient) {
2021
87
        _need_evict_cache_in_advance = true;
2022
87
        _need_evict_cache_in_advance_metrics->set_value(1);
2023
4.61k
    } else if (_need_evict_cache_in_advance &&
2024
4.61k
               (space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
2025
4.61k
               (inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
2026
4.61k
               (size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) {
2027
68
        _need_evict_cache_in_advance = false;
2028
68
        _need_evict_cache_in_advance_metrics->set_value(0);
2029
68
    }
2030
4.70k
    if (previous_mode != _need_evict_cache_in_advance) {
2031
        // add log for evict cache in advance mode switching
2032
140
        if (_need_evict_cache_in_advance) {
2033
72
            LOG(WARNING) << "Entering evict cache in advance mode: "
2034
72
                         << "file_cache=" << get_base_path()
2035
72
                         << " space_percent=" << space_percentage
2036
72
                         << " inode_percent=" << inode_percentage
2037
72
                         << " size_percent=" << size_percentage
2038
72
                         << " is_space_insufficient=" << is_space_insufficient
2039
72
                         << " is_inode_insufficient=" << is_inode_insufficient
2040
72
                         << " is_size_insufficient=" << is_size_insufficient << " enter threshold="
2041
72
                         << config::file_cache_enter_need_evict_cache_in_advance_percent;
2042
72
        } else {
2043
68
            LOG(INFO) << "Exiting evict cache in advance mode: "
2044
68
                      << "file_cache=" << get_base_path() << " space_percent=" << space_percentage
2045
68
                      << " inode_percent=" << inode_percentage
2046
68
                      << " size_percent=" << size_percentage << " exit threshold="
2047
68
                      << config::file_cache_exit_need_evict_cache_in_advance_percent;
2048
68
        }
2049
4.56k
    } else if (_need_evict_cache_in_advance) {
2050
        // print log for evict cache in advance mode running, but less frequently
2051
16
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
2052
4
                                 << " space_percent=" << space_percentage
2053
4
                                 << " inode_percent=" << inode_percentage
2054
4
                                 << " size_percent=" << size_percentage
2055
4
                                 << " is_space_insufficient=" << is_space_insufficient
2056
4
                                 << " is_inode_insufficient=" << is_inode_insufficient
2057
4
                                 << " is_size_insufficient=" << is_size_insufficient
2058
4
                                 << " need evict cache in advance";
2059
16
    }
2060
4.70k
}
2061
2062
163
void BlockFileCache::run_background_monitor() {
2063
163
    Thread::set_self_name("run_background_monitor");
2064
4.86k
    while (!_close) {
2065
4.85k
        int64_t interval_ms = config::file_cache_background_monitor_interval_ms;
2066
4.85k
        TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_ms);
2067
4.85k
        check_disk_resource_limit();
2068
4.85k
        if (config::enable_evict_file_cache_in_advance) {
2069
4.69k
            check_need_evict_cache_in_advance();
2070
4.69k
        } else {
2071
163
            _need_evict_cache_in_advance = false;
2072
163
            _need_evict_cache_in_advance_metrics->set_value(0);
2073
163
        }
2074
2075
4.85k
        {
2076
4.85k
            std::unique_lock close_lock(_close_mtx);
2077
4.85k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2078
4.85k
            if (_close) {
2079
160
                break;
2080
160
            }
2081
4.85k
        }
2082
        // report
2083
4.69k
        {
2084
4.69k
            SCOPED_CACHE_LOCK(_mutex, this);
2085
4.69k
            _cur_cache_size_metrics->set_value(_cur_cache_size);
2086
4.69k
            _cur_ttl_cache_size_metrics->set_value(_cur_cache_size -
2087
4.69k
                                                   _index_queue.get_capacity(cache_lock) -
2088
4.69k
                                                   _normal_queue.get_capacity(cache_lock) -
2089
4.69k
                                                   _disposable_queue.get_capacity(cache_lock));
2090
4.69k
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(
2091
4.69k
                    _ttl_queue.get_capacity(cache_lock));
2092
4.69k
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(
2093
4.69k
                    _ttl_queue.get_elements_num(cache_lock));
2094
4.69k
            _cur_normal_queue_cache_size_metrics->set_value(_normal_queue.get_capacity(cache_lock));
2095
4.69k
            _cur_normal_queue_element_count_metrics->set_value(
2096
4.69k
                    _normal_queue.get_elements_num(cache_lock));
2097
4.69k
            _cur_index_queue_cache_size_metrics->set_value(_index_queue.get_capacity(cache_lock));
2098
4.69k
            _cur_index_queue_element_count_metrics->set_value(
2099
4.69k
                    _index_queue.get_elements_num(cache_lock));
2100
4.69k
            _cur_disposable_queue_cache_size_metrics->set_value(
2101
4.69k
                    _disposable_queue.get_capacity(cache_lock));
2102
4.69k
            _cur_disposable_queue_element_count_metrics->set_value(
2103
4.69k
                    _disposable_queue.get_elements_num(cache_lock));
2104
2105
            // Update meta store write queue size if storage is FSFileCacheStorage
2106
4.69k
            if (_storage->get_type() == FileCacheStorageType::DISK) {
2107
4.69k
                auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get());
2108
4.69k
                if (fs_storage != nullptr) {
2109
4.69k
                    auto* meta_store = fs_storage->get_meta_store();
2110
4.69k
                    if (meta_store != nullptr) {
2111
4.69k
                        _meta_store_write_queue_size_metrics->set_value(
2112
4.69k
                                meta_store->get_write_queue_size());
2113
4.69k
                    }
2114
4.69k
                }
2115
4.69k
            }
2116
2117
4.69k
            if (_num_read_blocks->get_value() > 0) {
2118
3.15k
                _hit_ratio->set_value((double)_num_hit_blocks->get_value() /
2119
3.15k
                                      (double)_num_read_blocks->get_value());
2120
3.15k
            }
2121
4.69k
            if (_num_read_blocks_5m && _num_read_blocks_5m->get_value() > 0) {
2122
1.86k
                _hit_ratio_5m->set_value((double)_num_hit_blocks_5m->get_value() /
2123
1.86k
                                         (double)_num_read_blocks_5m->get_value());
2124
1.86k
            }
2125
4.69k
            if (_num_read_blocks_1h && _num_read_blocks_1h->get_value() > 0) {
2126
3.14k
                _hit_ratio_1h->set_value((double)_num_hit_blocks_1h->get_value() /
2127
3.14k
                                         (double)_num_read_blocks_1h->get_value());
2128
3.14k
            }
2129
2130
4.69k
            if (_no_warmup_num_read_blocks->get_value() > 0) {
2131
3.15k
                _no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() /
2132
3.15k
                                                (double)_no_warmup_num_read_blocks->get_value());
2133
3.15k
            }
2134
4.69k
            if (_no_warmup_num_read_blocks_5m && _no_warmup_num_read_blocks_5m->get_value() > 0) {
2135
1.86k
                _no_warmup_hit_ratio_5m->set_value(
2136
1.86k
                        (double)_no_warmup_num_hit_blocks_5m->get_value() /
2137
1.86k
                        (double)_no_warmup_num_read_blocks_5m->get_value());
2138
1.86k
            }
2139
4.69k
            if (_no_warmup_num_read_blocks_1h && _no_warmup_num_read_blocks_1h->get_value() > 0) {
2140
3.14k
                _no_warmup_hit_ratio_1h->set_value(
2141
3.14k
                        (double)_no_warmup_num_hit_blocks_1h->get_value() /
2142
3.14k
                        (double)_no_warmup_num_read_blocks_1h->get_value());
2143
3.14k
            }
2144
4.69k
        }
2145
4.69k
    }
2146
163
}
2147
2148
163
void BlockFileCache::run_background_gc() {
2149
163
    Thread::set_self_name("run_background_gc");
2150
163
    FileCacheKey key;
2151
163
    size_t batch_count = 0;
2152
233k
    while (!_close) {
2153
233k
        int64_t interval_ms = config::file_cache_background_gc_interval_ms;
2154
233k
        size_t batch_limit = config::file_cache_remove_block_qps_limit * interval_ms / 1000;
2155
233k
        {
2156
233k
            std::unique_lock close_lock(_close_mtx);
2157
233k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2158
233k
            if (_close) {
2159
160
                break;
2160
160
            }
2161
233k
        }
2162
2163
298k
        while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) {
2164
65.4k
            int64_t duration_ns = 0;
2165
65.4k
            Status st;
2166
65.4k
            {
2167
65.4k
                SCOPED_RAW_TIMER(&duration_ns);
2168
65.4k
                st = _storage->remove(key);
2169
65.4k
            }
2170
65.4k
            *_storage_async_remove_latency_us << (duration_ns / 1000);
2171
2172
65.4k
            if (!st.ok()) {
2173
0
                LOG_WARNING("").error(st);
2174
0
            }
2175
65.4k
            batch_count++;
2176
65.4k
        }
2177
233k
        *_recycle_keys_length_recorder << _recycle_keys.size_approx();
2178
233k
        batch_count = 0;
2179
233k
    }
2180
163
}
2181
2182
163
void BlockFileCache::run_background_evict_in_advance() {
2183
163
    Thread::set_self_name("run_background_evict_in_advance");
2184
163
    LOG(INFO) << "Starting background evict in advance thread";
2185
163
    int64_t batch = 0;
2186
23.6k
    while (!_close) {
2187
23.6k
        {
2188
23.6k
            std::unique_lock close_lock(_close_mtx);
2189
23.6k
            _close_cv.wait_for(
2190
23.6k
                    close_lock,
2191
23.6k
                    std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms));
2192
23.6k
            if (_close) {
2193
160
                LOG(INFO) << "Background evict in advance thread exiting due to cache closing";
2194
160
                break;
2195
160
            }
2196
23.6k
        }
2197
23.5k
        batch = config::file_cache_evict_in_advance_batch_bytes;
2198
2199
        // Skip if eviction not needed or too many pending recycles
2200
23.5k
        if (!_need_evict_cache_in_advance ||
2201
23.5k
            _recycle_keys.size_approx() >=
2202
23.1k
                    config::file_cache_evict_in_advance_recycle_keys_num_threshold) {
2203
23.1k
            continue;
2204
23.1k
        }
2205
2206
374
        int64_t duration_ns = 0;
2207
374
        {
2208
374
            SCOPED_CACHE_LOCK(_mutex, this);
2209
374
            SCOPED_RAW_TIMER(&duration_ns);
2210
374
            try_evict_in_advance(batch, cache_lock);
2211
374
        }
2212
374
        *_evict_in_advance_latency_us << (duration_ns / 1000);
2213
374
    }
2214
163
}
2215
2216
163
void BlockFileCache::run_background_block_lru_update() {
2217
163
    Thread::set_self_name("run_background_block_lru_update");
2218
163
    std::vector<FileBlockSPtr> batch;
2219
4.88k
    while (!_close) {
2220
4.88k
        int64_t interval_ms = config::file_cache_background_block_lru_update_interval_ms;
2221
4.88k
        size_t batch_limit =
2222
4.88k
                config::file_cache_background_block_lru_update_qps_limit * interval_ms / 1000;
2223
4.88k
        {
2224
4.88k
            std::unique_lock close_lock(_close_mtx);
2225
4.88k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2226
4.88k
            if (_close) {
2227
160
                break;
2228
160
            }
2229
4.88k
        }
2230
2231
4.72k
        batch.clear();
2232
4.72k
        batch.reserve(batch_limit);
2233
4.72k
        size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch);
2234
4.72k
        if (drained == 0) {
2235
3.92k
            *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2236
3.92k
            continue;
2237
3.92k
        }
2238
799
        *_need_update_lru_blocks_consume_metrics << drained;
2239
2240
799
        int64_t duration_ns = 0;
2241
799
        {
2242
799
            SCOPED_CACHE_LOCK(_mutex, this);
2243
799
            SCOPED_RAW_TIMER(&duration_ns);
2244
138k
            for (auto& block : batch) {
2245
138k
                update_block_lru(block, cache_lock);
2246
138k
            }
2247
799
        }
2248
799
        *_update_lru_blocks_latency_us << (duration_ns / 1000);
2249
799
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2250
799
    }
2251
163
}
2252
2253
std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>>
2254
2
BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
2255
2
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
2256
2
                               std::chrono::steady_clock::now().time_since_epoch())
2257
2
                               .count();
2258
2
    SCOPED_CACHE_LOCK(_mutex, this);
2259
2
    std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta;
2260
2
    if (auto iter = _files.find(hash); iter != _files.end()) {
2261
5
        for (auto& pair : _files.find(hash)->second) {
2262
5
            const FileBlockCell* cell = &pair.second;
2263
5
            if (cell->file_block->cache_type() != FileCacheType::DISPOSABLE) {
2264
4
                if (cell->file_block->cache_type() == FileCacheType::TTL ||
2265
4
                    (cell->atime != 0 &&
2266
3
                     cur_time - cell->atime <
2267
3
                             get_queue(cell->file_block->cache_type()).get_hot_data_interval())) {
2268
3
                    blocks_meta.emplace_back(pair.first, cell->size(),
2269
3
                                             cell->file_block->cache_type(),
2270
3
                                             cell->file_block->expiration_time());
2271
3
                }
2272
4
            }
2273
5
        }
2274
2
    }
2275
2
    return blocks_meta;
2276
2
}
2277
2278
bool BlockFileCache::try_reserve_during_async_load(size_t size,
2279
4
                                                   std::lock_guard<std::mutex>& cache_lock) {
2280
4
    size_t removed_size = 0;
2281
4
    size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
2282
4
    size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
2283
4
    size_t index_queue_size = _index_queue.get_capacity(cache_lock);
2284
2285
4
    std::vector<FileBlockCell*> to_evict;
2286
4
    auto collect_eliminate_fragments = [&](LRUQueue& queue) {
2287
4
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
2288
4
            if (!_disk_resource_limit_mode || removed_size >= size) {
2289
3
                break;
2290
3
            }
2291
1
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
2292
2293
1
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
2294
0
                         << ", offset: " << entry_offset;
2295
2296
1
            size_t cell_size = cell->size();
2297
1
            DCHECK(entry_size == cell_size);
2298
2299
1
            if (cell->releasable()) {
2300
1
                auto& file_block = cell->file_block;
2301
2302
1
                std::lock_guard block_lock(file_block->_mutex);
2303
1
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
2304
1
                to_evict.push_back(cell);
2305
1
                removed_size += cell_size;
2306
1
            }
2307
1
        }
2308
3
    };
2309
4
    if (disposable_queue_size != 0) {
2310
0
        collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE));
2311
0
    }
2312
4
    if (normal_queue_size != 0) {
2313
3
        collect_eliminate_fragments(get_queue(FileCacheType::NORMAL));
2314
3
    }
2315
4
    if (index_queue_size != 0) {
2316
0
        collect_eliminate_fragments(get_queue(FileCacheType::INDEX));
2317
0
    }
2318
4
    std::string reason = "async load";
2319
4
    remove_file_blocks(to_evict, cache_lock, true, reason);
2320
2321
4
    return !_disk_resource_limit_mode || removed_size >= size;
2322
4
}
2323
2324
32
void BlockFileCache::clear_need_update_lru_blocks() {
2325
32
    _need_update_lru_blocks.clear();
2326
32
    *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2327
32
}
2328
2329
287k
std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) {
2330
287k
    std::map<size_t, FileBlockSPtr> offset_to_block;
2331
287k
    SCOPED_CACHE_LOCK(_mutex, this);
2332
287k
    if (_files.contains(hash)) {
2333
249k
        for (auto& [offset, cell] : _files[hash]) {
2334
249k
            if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
2335
242k
                cell.file_block->_owned_by_cached_reader = true;
2336
242k
                offset_to_block.emplace(offset, cell.file_block);
2337
242k
            }
2338
249k
        }
2339
242k
    }
2340
287k
    return offset_to_block;
2341
287k
}
2342
2343
0
void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
2344
0
    SCOPED_CACHE_LOCK(_mutex, this);
2345
0
    if (auto iter = _files.find(hash); iter != _files.end()) {
2346
0
        for (auto& [_, cell] : iter->second) {
2347
0
            cell.update_atime();
2348
0
        }
2349
0
    };
2350
0
}
2351
2352
163
void BlockFileCache::run_background_lru_log_replay() {
2353
163
    Thread::set_self_name("run_background_lru_log_replay");
2354
19.4M
    while (!_close) {
2355
19.4M
        int64_t interval_ms = config::file_cache_background_lru_log_replay_interval_ms;
2356
19.4M
        {
2357
19.4M
            std::unique_lock close_lock(_close_mtx);
2358
19.4M
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2359
19.4M
            if (_close) {
2360
143
                break;
2361
143
            }
2362
19.4M
        }
2363
2364
19.4M
        replay_lru_logs_once();
2365
19.4M
    }
2366
163
}
2367
2368
19.4M
size_t BlockFileCache::replay_lru_logs_once() {
2369
19.4M
    size_t replayed = 0;
2370
77.6M
    for (FileCacheType type : LRU_LOG_REPLAY_TYPES) {
2371
77.6M
        replayed += _lru_recorder->replay_queue_event(type);
2372
77.6M
    }
2373
2374
19.4M
    if (replayed == 0) {
2375
19.2M
        *_lru_recorder_log_replay_idle_metrics << 1;
2376
19.2M
    }
2377
2378
19.4M
    if (config::enable_evaluate_shadow_queue_diff) {
2379
0
        SCOPED_CACHE_LOCK(_mutex, this);
2380
0
        _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock);
2381
0
        _lru_recorder->evaluate_queue_diff(_index_queue, "index", cache_lock);
2382
0
        _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", cache_lock);
2383
0
        _lru_recorder->evaluate_queue_diff(_disposable_queue, "disposable", cache_lock);
2384
0
    }
2385
19.4M
    return replayed;
2386
19.4M
}
2387
2388
400
void BlockFileCache::dump_lru_queues(bool force) {
2389
400
    std::unique_lock dump_lock(_dump_lru_queues_mtx);
2390
400
    if (config::file_cache_background_lru_dump_tail_record_num > 0 &&
2391
400
        !ExecEnv::GetInstance()->get_is_upgrading()) {
2392
400
        _lru_dumper->dump_queue("disposable", force);
2393
400
        _lru_dumper->dump_queue("normal", force);
2394
400
        _lru_dumper->dump_queue("index", force);
2395
400
        _lru_dumper->dump_queue("ttl", force);
2396
400
        _lru_dumper->set_first_dump_done();
2397
400
    }
2398
400
}
2399
2400
163
void BlockFileCache::run_background_lru_dump() {
2401
163
    Thread::set_self_name("run_background_lru_dump");
2402
566
    while (!_close) {
2403
563
        int64_t interval_ms = config::file_cache_background_lru_dump_interval_ms;
2404
563
        {
2405
563
            std::unique_lock close_lock(_close_mtx);
2406
563
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2407
563
            if (_close) {
2408
160
                break;
2409
160
            }
2410
563
        }
2411
403
        dump_lru_queues(false);
2412
403
    }
2413
163
}
2414
2415
163
void BlockFileCache::restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock) {
2416
    // keep this order coz may be duplicated in different queue, we use the first appearence
2417
163
    _lru_dumper->restore_queue(_ttl_queue, "ttl", cache_lock);
2418
163
    _lru_dumper->restore_queue(_index_queue, "index", cache_lock);
2419
163
    _lru_dumper->restore_queue(_normal_queue, "normal", cache_lock);
2420
163
    _lru_dumper->restore_queue(_disposable_queue, "disposable", cache_lock);
2421
163
}
2422
2423
165
std::map<std::string, double> BlockFileCache::get_stats() {
2424
165
    std::map<std::string, double> stats;
2425
165
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2426
165
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2427
165
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2428
2429
165
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2430
165
    stats["index_queue_curr_size"] = (double)_cur_index_queue_cache_size_metrics->get_value();
2431
165
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2432
165
    stats["index_queue_curr_elements"] =
2433
165
            (double)_cur_index_queue_element_count_metrics->get_value();
2434
2435
165
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2436
165
    stats["ttl_queue_curr_size"] = (double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value();
2437
165
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2438
165
    stats["ttl_queue_curr_elements"] =
2439
165
            (double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value();
2440
2441
165
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2442
165
    stats["normal_queue_curr_size"] = (double)_cur_normal_queue_cache_size_metrics->get_value();
2443
165
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2444
165
    stats["normal_queue_curr_elements"] =
2445
165
            (double)_cur_normal_queue_element_count_metrics->get_value();
2446
2447
165
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2448
165
    stats["disposable_queue_curr_size"] =
2449
165
            (double)_cur_disposable_queue_cache_size_metrics->get_value();
2450
165
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2451
165
    stats["disposable_queue_curr_elements"] =
2452
165
            (double)_cur_disposable_queue_element_count_metrics->get_value();
2453
2454
165
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2455
165
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2456
2457
165
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2458
165
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2459
165
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2460
2461
165
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2462
165
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2463
165
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2464
2465
165
    return stats;
2466
165
}
2467
2468
// for be UTs
2469
172
std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
2470
172
    std::map<std::string, double> stats;
2471
172
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2472
172
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2473
172
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2474
2475
172
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2476
172
    stats["index_queue_curr_size"] = (double)_index_queue.get_capacity_unsafe();
2477
172
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2478
172
    stats["index_queue_curr_elements"] = (double)_index_queue.get_elements_num_unsafe();
2479
2480
172
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2481
172
    stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe();
2482
172
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2483
172
    stats["ttl_queue_curr_elements"] = (double)_ttl_queue.get_elements_num_unsafe();
2484
2485
172
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2486
172
    stats["normal_queue_curr_size"] = (double)_normal_queue.get_capacity_unsafe();
2487
172
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2488
172
    stats["normal_queue_curr_elements"] = (double)_normal_queue.get_elements_num_unsafe();
2489
2490
172
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2491
172
    stats["disposable_queue_curr_size"] = (double)_disposable_queue.get_capacity_unsafe();
2492
172
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2493
172
    stats["disposable_queue_curr_elements"] = (double)_disposable_queue.get_elements_num_unsafe();
2494
2495
172
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2496
172
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2497
2498
172
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2499
172
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2500
172
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2501
2502
172
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2503
172
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2504
172
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2505
2506
172
    return stats;
2507
172
}
2508
2509
template void BlockFileCache::remove(FileBlockSPtr file_block,
2510
                                     std::lock_guard<std::mutex>& cache_lock,
2511
                                     std::lock_guard<std::mutex>& block_lock, bool sync);
2512
2513
1
Status BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>& results) {
2514
1
    InconsistencyContext inconsistency_context;
2515
1
    RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context));
2516
1
    auto n = inconsistency_context.types.size();
2517
1
    results.reserve(n);
2518
1
    for (size_t i = 0; i < n; i++) {
2519
0
        std::string result;
2520
0
        result += "File cache info in manager:\n";
2521
0
        result += inconsistency_context.infos_in_manager[i].to_string();
2522
0
        result += "File cache info in storage:\n";
2523
0
        result += inconsistency_context.infos_in_storage[i].to_string();
2524
0
        result += inconsistency_context.types[i].to_string();
2525
0
        result += "\n";
2526
0
        results.push_back(std::move(result));
2527
0
    }
2528
1
    return Status::OK();
2529
1
}
2530
2531
1
Status BlockFileCache::check_file_cache_consistency(InconsistencyContext& inconsistency_context) {
2532
1
    std::lock_guard<std::mutex> cache_lock(_mutex);
2533
1
    std::vector<FileCacheInfo> infos_in_storage;
2534
1
    RETURN_IF_ERROR(_storage->get_file_cache_infos(infos_in_storage, cache_lock));
2535
1
    std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks;
2536
1
    for (const auto& info_in_storage : infos_in_storage) {
2537
0
        confirmed_blocks.insert({info_in_storage.hash, info_in_storage.offset});
2538
0
        auto* cell = get_cell(info_in_storage.hash, info_in_storage.offset, cache_lock);
2539
0
        if (cell == nullptr || cell->file_block == nullptr) {
2540
0
            inconsistency_context.infos_in_manager.emplace_back();
2541
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2542
0
            inconsistency_context.types.emplace_back(InconsistencyType::NOT_LOADED);
2543
0
            continue;
2544
0
        }
2545
0
        FileCacheInfo info_in_manager {
2546
0
                .hash = info_in_storage.hash,
2547
0
                .expiration_time = cell->file_block->expiration_time(),
2548
0
                .size = cell->size(),
2549
0
                .offset = info_in_storage.offset,
2550
0
                .is_tmp = cell->file_block->state() == FileBlock::State::DOWNLOADING,
2551
0
                .cache_type = cell->file_block->cache_type()};
2552
0
        InconsistencyType inconsistent_type;
2553
0
        if (info_in_storage.is_tmp != info_in_manager.is_tmp) {
2554
0
            inconsistent_type |= InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE;
2555
0
        }
2556
0
        size_t expected_size =
2557
0
                info_in_manager.is_tmp ? cell->dowloading_size() : info_in_manager.size;
2558
0
        if (info_in_storage.size != expected_size) {
2559
0
            inconsistent_type |= InconsistencyType::SIZE_INCONSISTENT;
2560
0
        }
2561
        // Only if it is not a tmp file need we check the cache type.
2562
0
        if ((inconsistent_type & InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE) == 0 &&
2563
0
            info_in_storage.cache_type != info_in_manager.cache_type) {
2564
0
            inconsistent_type |= InconsistencyType::CACHE_TYPE_INCONSISTENT;
2565
0
        }
2566
0
        if (info_in_storage.expiration_time != info_in_manager.expiration_time) {
2567
0
            inconsistent_type |= InconsistencyType::EXPIRATION_TIME_INCONSISTENT;
2568
0
        }
2569
0
        if (inconsistent_type != InconsistencyType::NONE) {
2570
0
            inconsistency_context.infos_in_manager.push_back(info_in_manager);
2571
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2572
0
            inconsistency_context.types.push_back(inconsistent_type);
2573
0
        }
2574
0
    }
2575
2576
1
    for (const auto& [hash, offset_to_cell] : _files) {
2577
0
        for (const auto& [offset, cell] : offset_to_cell) {
2578
0
            if (confirmed_blocks.contains({hash, offset})) {
2579
0
                continue;
2580
0
            }
2581
0
            const auto& block = cell.file_block;
2582
0
            inconsistency_context.infos_in_manager.emplace_back(
2583
0
                    hash, block->expiration_time(), cell.size(), offset,
2584
0
                    cell.file_block->state() == FileBlock::State::DOWNLOADING, block->cache_type());
2585
0
            inconsistency_context.infos_in_storage.emplace_back();
2586
0
            inconsistency_context.types.emplace_back(InconsistencyType::MISSING_IN_STORAGE);
2587
0
        }
2588
0
    }
2589
1
    return Status::OK();
2590
1
}
2591
2592
} // namespace doris::io