Coverage Report

Created: 2026-06-12 14:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/block_file_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/Cache/FileCache.cpp
19
// and modified by Doris
20
21
#include "io/cache/block_file_cache.h"
22
23
#include <gen_cpp/file_cache.pb.h>
24
25
#include <cstdio>
26
#include <exception>
27
#include <fstream>
28
#include <unordered_set>
29
30
#include "common/status.h"
31
#include "cpp/sync_point.h"
32
#include "runtime/exec_env.h"
33
34
#if defined(__APPLE__)
35
#include <sys/mount.h>
36
#else
37
#include <sys/statfs.h>
38
#endif
39
40
#include <chrono> // IWYU pragma: keep
41
#include <mutex>
42
#include <ranges>
43
44
#include "common/cast_set.h"
45
#include "common/check.h"
46
#include "common/config.h"
47
#include "common/logging.h"
48
#include "core/uint128.h"
49
#include "exec/common/sip_hash.h"
50
#include "io/cache/block_file_cache_ttl_mgr.h"
51
#include "io/cache/file_block.h"
52
#include "io/cache/file_cache_common.h"
53
#include "io/cache/fs_file_cache_storage.h"
54
#include "io/cache/mem_file_cache_storage.h"
55
#include "runtime/runtime_profile.h"
56
#include "util/concurrency_stats.h"
57
#include "util/stack_util.h"
58
#include "util/stopwatch.hpp"
59
#include "util/thread.h"
60
#include "util/time.h"
61
namespace doris::io {
62
63
namespace {
64
65
constexpr std::array<FileCacheType, 4> LRU_LOG_REPLAY_TYPES = {
66
        FileCacheType::TTL, FileCacheType::INDEX, FileCacheType::NORMAL, FileCacheType::DISPOSABLE};
67
68
720
size_t file_cache_type_index(FileCacheType type) {
69
720
    return static_cast<size_t>(type);
70
720
}
71
72
} // namespace
73
74
// Insert a block pointer into one shard while swallowing allocation failures.
75
193k
bool NeedUpdateLRUBlocks::insert(FileBlockSPtr block, size_t max_queue_size) {
76
193k
    if (!block || max_queue_size == 0) {
77
1
        return false;
78
1
    }
79
193k
    bool reserved = false;
80
193k
    try {
81
193k
        auto* raw_ptr = block.get();
82
193k
        auto idx = shard_index(raw_ptr);
83
193k
        auto& shard = _shards[idx];
84
193k
        std::lock_guard lock(shard.mutex);
85
193k
        if (shard.entries.contains(raw_ptr)) {
86
191k
            return false;
87
191k
        }
88
1.56k
        size_t cur_size = _size.load(std::memory_order_relaxed);
89
1.56k
        while (cur_size < max_queue_size) {
90
1.56k
            if (_size.compare_exchange_weak(cur_size, cur_size + 1, std::memory_order_relaxed)) {
91
1.56k
                reserved = true;
92
1.56k
                break;
93
1.56k
            }
94
1.56k
        }
95
1.56k
        if (!reserved) {
96
1
            return false;
97
1
        }
98
1.55k
        auto [_, inserted] = shard.entries.emplace(raw_ptr, std::move(block));
99
1.55k
        DORIS_CHECK(inserted);
100
1.55k
        return true;
101
1.56k
    } catch (const std::exception& e) {
102
0
        if (reserved) {
103
0
            decrease_size(1);
104
0
        }
105
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: " << e.what();
106
0
    } catch (...) {
107
0
        if (reserved) {
108
0
            decrease_size(1);
109
0
        }
110
0
        LOG(WARNING) << "Failed to enqueue block for LRU update: unknown error";
111
0
    }
112
0
    return false;
113
193k
}
114
115
// Drain up to `limit` unique blocks to the caller, keeping the structure consistent on failures.
116
50
size_t NeedUpdateLRUBlocks::drain(size_t limit, std::vector<FileBlockSPtr>* output) {
117
50
    if (limit == 0 || output == nullptr) {
118
2
        return 0;
119
2
    }
120
48
    size_t drained = 0;
121
48
    try {
122
48
        output->reserve(output->size() + std::min(limit, size()));
123
3.02k
        for (auto& shard : _shards) {
124
3.02k
            if (drained >= limit) {
125
1
                break;
126
1
            }
127
3.02k
            std::lock_guard lock(shard.mutex);
128
3.02k
            auto it = shard.entries.begin();
129
3.02k
            size_t shard_drained = 0;
130
3.03k
            while (it != shard.entries.end() && drained + shard_drained < limit) {
131
10
                output->emplace_back(std::move(it->second));
132
10
                it = shard.entries.erase(it);
133
10
                ++shard_drained;
134
10
            }
135
3.02k
            if (shard_drained > 0) {
136
7
                decrease_size(shard_drained);
137
7
                drained += shard_drained;
138
7
            }
139
3.02k
        }
140
48
    } catch (const std::exception& e) {
141
0
        LOG(WARNING) << "Failed to drain LRU update blocks: " << e.what();
142
0
    } catch (...) {
143
0
        LOG(WARNING) << "Failed to drain LRU update blocks: unknown error";
144
0
    }
145
48
    return drained;
146
48
}
147
148
// Remove every pending block, guarding against unexpected exceptions.
149
23
void NeedUpdateLRUBlocks::clear() {
150
23
    try {
151
1.47k
        for (auto& shard : _shards) {
152
1.47k
            std::lock_guard lock(shard.mutex);
153
1.47k
            if (!shard.entries.empty()) {
154
2
                auto removed = shard.entries.size();
155
2
                shard.entries.clear();
156
2
                decrease_size(removed);
157
2
            }
158
1.47k
        }
159
23
    } catch (const std::exception& e) {
160
0
        LOG(WARNING) << "Failed to clear LRU update blocks: " << e.what();
161
0
    } catch (...) {
162
0
        LOG(WARNING) << "Failed to clear LRU update blocks: unknown error";
163
0
    }
164
23
}
165
166
9
void NeedUpdateLRUBlocks::decrease_size(size_t delta) {
167
9
    size_t cur_size = _size.load(std::memory_order_relaxed);
168
9
    while (true) {
169
9
        DORIS_CHECK_GE(cur_size, delta);
170
9
        if (_size.compare_exchange_weak(cur_size, cur_size - delta, std::memory_order_relaxed)) {
171
9
            return;
172
9
        }
173
9
    }
174
9
}
175
176
193k
size_t NeedUpdateLRUBlocks::shard_index(FileBlock* ptr) const {
177
193k
    DCHECK(ptr != nullptr);
178
193k
    return std::hash<FileBlock*> {}(ptr)&kShardMask;
179
193k
}
180
181
BlockFileCache::BlockFileCache(const std::string& cache_base_path,
182
                               const FileCacheSettings& cache_settings)
183
180
        : _cache_base_path(cache_base_path),
184
180
          _capacity(cache_settings.capacity),
185
180
          _max_file_block_size(cache_settings.max_file_block_size) {
186
180
    _cur_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(_cache_base_path.c_str(),
187
180
                                                                     "file_cache_cache_size", 0);
188
180
    _cache_capacity_metrics = std::make_shared<bvar::Status<size_t>>(
189
180
            _cache_base_path.c_str(), "file_cache_capacity", _capacity);
190
180
    _cur_ttl_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
191
180
            _cache_base_path.c_str(), "file_cache_ttl_cache_size", 0);
192
180
    _cur_normal_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
193
180
            _cache_base_path.c_str(), "file_cache_normal_queue_element_count", 0);
194
180
    _cur_ttl_cache_lru_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
195
180
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_size", 0);
196
180
    _cur_ttl_cache_lru_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
197
180
            _cache_base_path.c_str(), "file_cache_ttl_cache_lru_queue_element_count", 0);
198
180
    _cur_normal_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
199
180
            _cache_base_path.c_str(), "file_cache_normal_queue_cache_size", 0);
200
180
    _cur_index_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
201
180
            _cache_base_path.c_str(), "file_cache_index_queue_element_count", 0);
202
180
    _cur_index_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
203
180
            _cache_base_path.c_str(), "file_cache_index_queue_cache_size", 0);
204
180
    _cur_disposable_queue_element_count_metrics = std::make_shared<bvar::Status<size_t>>(
205
180
            _cache_base_path.c_str(), "file_cache_disposable_queue_element_count", 0);
206
180
    _cur_disposable_queue_cache_size_metrics = std::make_shared<bvar::Status<size_t>>(
207
180
            _cache_base_path.c_str(), "file_cache_disposable_queue_cache_size", 0);
208
209
180
    _queue_evict_size_metrics[0] = std::make_shared<bvar::Adder<size_t>>(
210
180
            _cache_base_path.c_str(), "file_cache_index_queue_evict_size");
211
180
    _queue_evict_size_metrics[1] = std::make_shared<bvar::Adder<size_t>>(
212
180
            _cache_base_path.c_str(), "file_cache_normal_queue_evict_size");
213
180
    _queue_evict_size_metrics[2] = std::make_shared<bvar::Adder<size_t>>(
214
180
            _cache_base_path.c_str(), "file_cache_disposable_queue_evict_size");
215
180
    _queue_evict_size_metrics[3] = std::make_shared<bvar::Adder<size_t>>(
216
180
            _cache_base_path.c_str(), "file_cache_ttl_cache_evict_size");
217
180
    _total_evict_size_metrics = std::make_shared<bvar::Adder<size_t>>(
218
180
            _cache_base_path.c_str(), "file_cache_total_evict_size");
219
180
    _total_read_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
220
180
                                                                     "file_cache_total_read_size");
221
180
    _total_hit_size_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
222
180
                                                                    "file_cache_total_hit_size");
223
180
    _gc_evict_bytes_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
224
180
                                                                    "file_cache_gc_evict_bytes");
225
180
    _gc_evict_count_metrics = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
226
180
                                                                    "file_cache_gc_evict_count");
227
228
180
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
229
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
230
180
                                                  "file_cache_evict_by_time_disposable_to_normal");
231
180
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
232
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
233
180
                                                  "file_cache_evict_by_time_disposable_to_index");
234
180
    _evict_by_time_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
235
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
236
180
                                                  "file_cache_evict_by_time_disposable_to_ttl");
237
180
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
238
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
239
180
                                                  "file_cache_evict_by_time_normal_to_disposable");
240
180
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
241
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
242
180
                                                  "file_cache_evict_by_time_normal_to_index");
243
180
    _evict_by_time_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
244
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
245
180
                                                  "file_cache_evict_by_time_normal_to_ttl");
246
180
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
247
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
248
180
                                                  "file_cache_evict_by_time_index_to_disposable");
249
180
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
250
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
251
180
                                                  "file_cache_evict_by_time_index_to_normal");
252
180
    _evict_by_time_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
253
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
254
180
                                                  "file_cache_evict_by_time_index_to_ttl");
255
180
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
256
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
257
180
                                                  "file_cache_evict_by_time_ttl_to_disposable");
258
180
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
259
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
260
180
                                                  "file_cache_evict_by_time_ttl_to_normal");
261
180
    _evict_by_time_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
262
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
263
180
                                                  "file_cache_evict_by_time_ttl_to_index");
264
265
180
    _evict_by_self_lru_metrics_matrix[FileCacheType::DISPOSABLE] =
266
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
267
180
                                                  "file_cache_evict_by_self_lru_disposable");
268
180
    _evict_by_self_lru_metrics_matrix[FileCacheType::NORMAL] =
269
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
270
180
                                                  "file_cache_evict_by_self_lru_normal");
271
180
    _evict_by_self_lru_metrics_matrix[FileCacheType::INDEX] = std::make_shared<bvar::Adder<size_t>>(
272
180
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_index");
273
180
    _evict_by_self_lru_metrics_matrix[FileCacheType::TTL] = std::make_shared<bvar::Adder<size_t>>(
274
180
            _cache_base_path.c_str(), "file_cache_evict_by_self_lru_ttl");
275
276
180
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::NORMAL] =
277
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
278
180
                                                  "file_cache_evict_by_size_disposable_to_normal");
279
180
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::INDEX] =
280
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
281
180
                                                  "file_cache_evict_by_size_disposable_to_index");
282
180
    _evict_by_size_metrics_matrix[FileCacheType::DISPOSABLE][FileCacheType::TTL] =
283
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
284
180
                                                  "file_cache_evict_by_size_disposable_to_ttl");
285
180
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::DISPOSABLE] =
286
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
287
180
                                                  "file_cache_evict_by_size_normal_to_disposable");
288
180
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::INDEX] =
289
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
290
180
                                                  "file_cache_evict_by_size_normal_to_index");
291
180
    _evict_by_size_metrics_matrix[FileCacheType::NORMAL][FileCacheType::TTL] =
292
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
293
180
                                                  "file_cache_evict_by_size_normal_to_ttl");
294
180
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::DISPOSABLE] =
295
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
296
180
                                                  "file_cache_evict_by_size_index_to_disposable");
297
180
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::NORMAL] =
298
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
299
180
                                                  "file_cache_evict_by_size_index_to_normal");
300
180
    _evict_by_size_metrics_matrix[FileCacheType::INDEX][FileCacheType::TTL] =
301
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
302
180
                                                  "file_cache_evict_by_size_index_to_ttl");
303
180
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::DISPOSABLE] =
304
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
305
180
                                                  "file_cache_evict_by_size_ttl_to_disposable");
306
180
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::NORMAL] =
307
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
308
180
                                                  "file_cache_evict_by_size_ttl_to_normal");
309
180
    _evict_by_size_metrics_matrix[FileCacheType::TTL][FileCacheType::INDEX] =
310
180
            std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
311
180
                                                  "file_cache_evict_by_size_ttl_to_index");
312
313
180
    _evict_by_try_release = std::make_shared<bvar::Adder<size_t>>(
314
180
            _cache_base_path.c_str(), "file_cache_evict_by_try_release");
315
316
180
    _num_read_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
317
180
                                                             "file_cache_num_read_blocks");
318
180
    _num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
319
180
                                                            "file_cache_num_hit_blocks");
320
180
    _num_removed_blocks = std::make_shared<bvar::Adder<size_t>>(_cache_base_path.c_str(),
321
180
                                                                "file_cache_num_removed_blocks");
322
323
180
    _no_warmup_num_read_blocks = std::make_shared<bvar::Adder<size_t>>(
324
180
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks");
325
180
    _no_warmup_num_hit_blocks = std::make_shared<bvar::Adder<size_t>>(
326
180
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks");
327
328
#ifndef BE_TEST
329
    _num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
330
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_5m", _num_hit_blocks.get(), 300);
331
    _num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
332
            _cache_base_path.c_str(), "file_cache_num_read_blocks_5m", _num_read_blocks.get(), 300);
333
    _num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
334
            _cache_base_path.c_str(), "file_cache_num_hit_blocks_1h", _num_hit_blocks.get(), 3600);
335
    _num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
336
            _cache_base_path.c_str(), "file_cache_num_read_blocks_1h", _num_read_blocks.get(),
337
            3600);
338
    _no_warmup_num_hit_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
339
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_5m",
340
            _no_warmup_num_hit_blocks.get(), 300);
341
    _no_warmup_num_read_blocks_5m = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
342
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_5m",
343
            _no_warmup_num_read_blocks.get(), 300);
344
    _no_warmup_num_hit_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
345
            _cache_base_path.c_str(), "file_cache_no_warmup_num_hit_blocks_1h",
346
            _no_warmup_num_hit_blocks.get(), 3600);
347
    _no_warmup_num_read_blocks_1h = std::make_shared<bvar::Window<bvar::Adder<size_t>>>(
348
            _cache_base_path.c_str(), "file_cache_no_warmup_num_read_blocks_1h",
349
            _no_warmup_num_read_blocks.get(), 3600);
350
#endif
351
352
180
    _hit_ratio = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
353
180
                                                        "file_cache_hit_ratio", 0.0);
354
180
    _hit_ratio_5m = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
355
180
                                                           "file_cache_hit_ratio_5m", 0.0);
356
180
    _hit_ratio_1h = std::make_shared<bvar::Status<double>>(_cache_base_path.c_str(),
357
180
                                                           "file_cache_hit_ratio_1h", 0.0);
358
359
180
    _no_warmup_hit_ratio = std::make_shared<bvar::Status<double>>(
360
180
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio", 0.0);
361
180
    _no_warmup_hit_ratio_5m = std::make_shared<bvar::Status<double>>(
362
180
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_5m", 0.0);
363
180
    _no_warmup_hit_ratio_1h = std::make_shared<bvar::Status<double>>(
364
180
            _cache_base_path.c_str(), "file_cache_no_warmup_hit_ratio_1h", 0.0);
365
366
180
    _disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t>>(
367
180
            _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0);
368
180
    _need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t>>(
369
180
            _cache_base_path.c_str(), "file_cache_need_evict_cache_in_advance", 0);
370
180
    _meta_store_write_queue_size_metrics = std::make_shared<bvar::Status<size_t>>(
371
180
            _cache_base_path.c_str(), "file_cache_meta_store_write_queue_size", 0);
372
373
180
    _cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
374
180
            _cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us");
375
180
    _get_or_set_latency_us = std::make_shared<bvar::LatencyRecorder>(
376
180
            _cache_base_path.c_str(), "file_cache_get_or_set_latency_us");
377
180
    _storage_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
378
180
            _cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_us");
379
180
    _storage_retry_sync_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
380
180
            _cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us");
381
180
    _storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
382
180
            _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us");
383
180
    _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
384
180
            _cache_base_path.c_str(), "file_cache_evict_in_advance_latency_us");
385
180
    _lru_dump_latency_us = std::make_shared<bvar::LatencyRecorder>(
386
180
            _cache_base_path.c_str(), "file_cache_lru_dump_latency_us");
387
180
    _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
388
180
            _cache_base_path.c_str(), "file_cache_recycle_keys_length");
389
180
    _need_update_lru_blocks_length_recorder = std::make_shared<bvar::LatencyRecorder>(
390
180
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_length");
391
180
    _need_update_lru_blocks_produce_metrics = std::make_shared<bvar::Adder<size_t>>(
392
180
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_produce");
393
180
    _need_update_lru_blocks_consume_metrics = std::make_shared<bvar::Adder<size_t>>(
394
180
            _cache_base_path.c_str(), "file_cache_need_update_lru_blocks_consume");
395
180
    _update_lru_blocks_latency_us = std::make_shared<bvar::LatencyRecorder>(
396
180
            _cache_base_path.c_str(), "file_cache_update_lru_blocks_latency_us");
397
180
    _ttl_gc_latency_us = std::make_shared<bvar::LatencyRecorder>(_cache_base_path.c_str(),
398
180
                                                                 "file_cache_ttl_gc_latency_us");
399
180
    _shadow_queue_levenshtein_distance = std::make_shared<bvar::LatencyRecorder>(
400
180
            _cache_base_path.c_str(), "file_cache_shadow_queue_levenshtein_distance");
401
180
    for (FileCacheType type : {FileCacheType::DISPOSABLE, FileCacheType::NORMAL,
402
720
                               FileCacheType::INDEX, FileCacheType::TTL}) {
403
720
        size_t idx = file_cache_type_index(type);
404
720
        std::string metric_prefix =
405
720
                "file_cache_lru_recorder_" + cache_type_to_string(type) + "_record_queue";
406
720
        _lru_recorder_queue_length_recorder[idx] = std::make_shared<bvar::LatencyRecorder>(
407
720
                _cache_base_path.c_str(), metric_prefix + "_length");
408
720
        _lru_recorder_queue_produce_metrics[idx] = std::make_shared<bvar::Adder<size_t>>(
409
720
                _cache_base_path.c_str(), metric_prefix + "_produce");
410
720
        _lru_recorder_queue_consume_metrics[idx] = std::make_shared<bvar::Adder<size_t>>(
411
720
                _cache_base_path.c_str(), metric_prefix + "_consume");
412
720
    }
413
180
    _lru_recorder_log_replay_idle_metrics = std::make_shared<bvar::Adder<size_t>>(
414
180
            _cache_base_path.c_str(), "file_cache_lru_recorder_log_replay_idle");
415
416
180
    _disposable_queue = LRUQueue(cache_settings.disposable_queue_size,
417
180
                                 cache_settings.disposable_queue_elements, 60 * 60);
418
180
    _index_queue = LRUQueue(cache_settings.index_queue_size, cache_settings.index_queue_elements,
419
180
                            7 * 24 * 60 * 60);
420
180
    _normal_queue = LRUQueue(cache_settings.query_queue_size, cache_settings.query_queue_elements,
421
180
                             24 * 60 * 60);
422
180
    _ttl_queue = LRUQueue(cache_settings.ttl_queue_size, cache_settings.ttl_queue_elements,
423
180
                          std::numeric_limits<int>::max());
424
425
180
    _lru_recorder = std::make_unique<LRUQueueRecorder>(this);
426
180
    _lru_dumper = std::make_unique<CacheLRUDumper>(this, _lru_recorder.get());
427
180
    if (cache_settings.storage == "memory") {
428
24
        _storage = std::make_unique<MemFileCacheStorage>();
429
24
        _cache_base_path = "memory";
430
156
    } else {
431
156
        _storage = std::make_unique<FSFileCacheStorage>();
432
156
    }
433
434
180
    LOG(INFO) << "file cache path= " << _cache_base_path << " " << cache_settings.to_string();
435
180
}
436
437
14.5k
UInt128Wrapper BlockFileCache::hash(const std::string& path) {
438
14.5k
    uint128_t value;
439
14.5k
    sip_hash128(path.data(), path.size(), reinterpret_cast<char*>(&value));
440
14.5k
    return UInt128Wrapper(value);
441
14.5k
}
442
443
BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder(
444
8
        const TUniqueId& query_id, int file_cache_query_limit_percent) {
445
8
    SCOPED_CACHE_LOCK(_mutex, this);
446
8
    if (!config::enable_file_cache_query_limit) {
447
1
        return {};
448
1
    }
449
450
    /// if enable_filesystem_query_cache_limit is true,
451
    /// we create context query for current query.
452
7
    auto context = get_or_set_query_context(query_id, cache_lock, file_cache_query_limit_percent);
453
7
    return std::make_unique<QueryFileCacheContextHolder>(query_id, this, context);
454
8
}
455
456
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context(
457
3.37k
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock) {
458
3.37k
    auto query_iter = _query_map.find(query_id);
459
3.37k
    return (query_iter == _query_map.end()) ? nullptr : query_iter->second;
460
3.37k
}
461
462
6
void BlockFileCache::remove_query_context(const TUniqueId& query_id) {
463
6
    SCOPED_CACHE_LOCK(_mutex, this);
464
6
    const auto& query_iter = _query_map.find(query_id);
465
466
6
    if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) {
467
5
        _query_map.erase(query_iter);
468
5
    }
469
6
}
470
471
BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_or_set_query_context(
472
        const TUniqueId& query_id, std::lock_guard<std::mutex>& cache_lock,
473
7
        int file_cache_query_limit_percent) {
474
7
    if (query_id.lo == 0 && query_id.hi == 0) {
475
1
        return nullptr;
476
1
    }
477
478
6
    auto context = get_query_context(query_id, cache_lock);
479
6
    if (context) {
480
1
        return context;
481
1
    }
482
483
5
    size_t file_cache_query_limit_size = _capacity * file_cache_query_limit_percent / 100;
484
5
    if (file_cache_query_limit_size < 268435456) {
485
5
        LOG(WARNING) << "The user-set file cache query limit (" << file_cache_query_limit_size
486
5
                     << " bytes) is less than the 256MB recommended minimum. "
487
5
                     << "Consider increasing the session variable 'file_cache_query_limit_percent'"
488
5
                     << " from its current value " << file_cache_query_limit_percent << "%.";
489
5
    }
490
5
    auto query_context = std::make_shared<QueryFileCacheContext>(file_cache_query_limit_size);
491
5
    auto query_iter = _query_map.emplace(query_id, query_context).first;
492
5
    return query_iter->second;
493
6
}
494
495
void BlockFileCache::QueryFileCacheContext::remove(const UInt128Wrapper& hash, size_t offset,
496
20
                                                   std::lock_guard<std::mutex>& cache_lock) {
497
20
    auto pair = std::make_pair(hash, offset);
498
20
    auto record = records.find(pair);
499
20
    DCHECK(record != records.end());
500
20
    auto iter = record->second;
501
20
    records.erase(pair);
502
20
    lru_queue.remove(iter, cache_lock);
503
20
}
504
505
void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash, size_t offset,
506
                                                    size_t size,
507
30
                                                    std::lock_guard<std::mutex>& cache_lock) {
508
30
    auto pair = std::make_pair(hash, offset);
509
30
    if (records.find(pair) == records.end()) {
510
29
        auto queue_iter = lru_queue.add(hash, offset, size, cache_lock);
511
29
        records.insert({pair, queue_iter});
512
29
    }
513
30
}
514
515
155
Status BlockFileCache::initialize() {
516
155
    SCOPED_CACHE_LOCK(_mutex, this);
517
155
    return initialize_unlocked(cache_lock);
518
155
}
519
520
155
Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lock) {
521
155
    DCHECK(!_is_initialized);
522
155
    _is_initialized = true;
523
155
    if (config::file_cache_background_lru_dump_tail_record_num > 0) {
524
        // requirements:
525
        // 1. restored data should not overwrite the last dump
526
        // 2. restore should happen before load and async load
527
        // 3. all queues should be restored sequencially to avoid conflict
528
        // TODO(zhengyu): we can parralize them but will increase complexity, so lets check the time cost
529
        // to see if any improvement is a necessary
530
155
        restore_lru_queues_from_disk(cache_lock);
531
155
    }
532
155
    RETURN_IF_ERROR(_storage->init(this));
533
534
155
    if (auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get())) {
535
133
        if (auto* meta_store = fs_storage->get_meta_store()) {
536
133
            _ttl_mgr = std::make_unique<BlockFileCacheTtlMgr>(this, meta_store);
537
133
        }
538
133
    }
539
540
155
    _cache_background_monitor_thread = std::thread(&BlockFileCache::run_background_monitor, this);
541
155
    _cache_background_gc_thread = std::thread(&BlockFileCache::run_background_gc, this);
542
155
    _cache_background_evict_in_advance_thread =
543
155
            std::thread(&BlockFileCache::run_background_evict_in_advance, this);
544
155
    _cache_background_block_lru_update_thread =
545
155
            std::thread(&BlockFileCache::run_background_block_lru_update, this);
546
547
    // Initialize LRU dump thread and restore queues
548
155
    _cache_background_lru_dump_thread = std::thread(&BlockFileCache::run_background_lru_dump, this);
549
155
    _cache_background_lru_log_replay_thread =
550
155
            std::thread(&BlockFileCache::run_background_lru_log_replay, this);
551
552
155
    return Status::OK();
553
155
}
554
555
void BlockFileCache::update_block_lru(FileBlockSPtr block,
556
7
                                      std::lock_guard<std::mutex>& cache_lock) {
557
7
    if (!block) {
558
1
        return;
559
1
    }
560
561
6
    FileBlockCell* cell = get_cell(block->get_hash_value(), block->offset(), cache_lock);
562
6
    if (!cell || cell->file_block.get() != block.get()) {
563
1
        return;
564
1
    }
565
566
5
    if (cell->queue_iterator) {
567
5
        auto& queue = get_queue(block->cache_type());
568
5
        queue.move_to_end(*cell->queue_iterator, cache_lock);
569
5
        _lru_recorder->record_queue_event(block->cache_type(), CacheLRULogType::MOVETOBACK,
570
5
                                          block->_key.hash, block->_key.offset,
571
5
                                          block->_block_range.size());
572
5
    }
573
5
    cell->update_atime();
574
5
}
575
576
void BlockFileCache::use_cell(const FileBlockCell& cell, FileBlocks* result, bool move_iter_flag,
577
712k
                              std::lock_guard<std::mutex>& cache_lock) {
578
712k
    if (result) {
579
712k
        result->push_back(cell.file_block);
580
712k
    }
581
582
712k
    auto& queue = get_queue(cell.file_block->cache_type());
583
    /// Move to the end of the queue. The iterator remains valid.
584
712k
    if (!config::enable_file_cache_async_touch_on_get_or_set && cell.queue_iterator &&
585
712k
        move_iter_flag) {
586
520k
        queue.move_to_end(*cell.queue_iterator, cache_lock);
587
520k
        _lru_recorder->record_queue_event(cell.file_block->cache_type(),
588
520k
                                          CacheLRULogType::MOVETOBACK, cell.file_block->_key.hash,
589
520k
                                          cell.file_block->_key.offset, cell.size());
590
520k
    }
591
592
712k
    cell.update_atime();
593
712k
}
594
595
template <class T>
596
    requires IsXLock<T>
597
FileBlockCell* BlockFileCache::get_cell(const UInt128Wrapper& hash, size_t offset,
598
6.01k
                                        T& /* cache_lock */) {
599
6.01k
    auto it = _files.find(hash);
600
6.01k
    if (it == _files.end()) {
601
3
        return nullptr;
602
3
    }
603
604
6.01k
    auto& offsets = it->second;
605
6.01k
    auto cell_it = offsets.find(offset);
606
6.01k
    if (cell_it == offsets.end()) {
607
3
        return nullptr;
608
3
    }
609
610
6.01k
    return &cell_it->second;
611
6.01k
}
612
613
904k
bool BlockFileCache::need_to_move(FileCacheType cell_type, FileCacheType query_type) const {
614
904k
    return query_type != FileCacheType::DISPOSABLE && cell_type != FileCacheType::DISPOSABLE;
615
904k
}
616
617
FileBlocks BlockFileCache::get_impl(const UInt128Wrapper& hash, const CacheContext& context,
618
                                    const FileBlock::Range& range,
619
725k
                                    std::lock_guard<std::mutex>& cache_lock) {
620
    /// Given range = [left, right] and non-overlapping ordered set of file blocks,
621
    /// find list [block1, ..., blockN] of blocks which intersect with given range.
622
725k
    auto it = _files.find(hash);
623
725k
    if (it == _files.end()) {
624
7.07k
        if (_async_open_done) {
625
7.07k
            return {};
626
7.07k
        }
627
1
        FileCacheKey key;
628
1
        key.hash = hash;
629
1
        key.meta.type = context.cache_type;
630
1
        key.meta.expiration_time = context.expiration_time;
631
1
        key.meta.tablet_id = context.tablet_id;
632
1
        _storage->load_blocks_directly_unlocked(this, key, cache_lock);
633
634
1
        it = _files.find(hash);
635
1
        if (it == _files.end()) [[unlikely]] {
636
1
            return {};
637
1
        }
638
1
    }
639
640
718k
    auto& file_blocks = it->second;
641
718k
    if (file_blocks.empty()) {
642
0
        LOG(WARNING) << "file_blocks is empty for hash=" << hash.to_string()
643
0
                     << " cache type=" << context.cache_type
644
0
                     << " cache expiration time=" << context.expiration_time
645
0
                     << " cache range=" << range.left << " " << range.right
646
0
                     << " query id=" << context.query_id;
647
0
        DCHECK(false);
648
0
        _files.erase(hash);
649
0
        return {};
650
0
    }
651
652
718k
    FileBlocks result;
653
718k
    auto block_it = file_blocks.lower_bound(range.left);
654
718k
    if (block_it == file_blocks.end()) {
655
        /// N - last cached block for given file hash, block{N}.offset < range.left:
656
        ///   block{N}                       block{N}
657
        /// [________                         [_______]
658
        ///     [__________]         OR                  [________]
659
        ///     ^                                        ^
660
        ///     range.left                               range.left
661
662
6.19k
        const auto& cell = file_blocks.rbegin()->second;
663
6.19k
        if (cell.file_block->range().right < range.left) {
664
6.18k
            return {};
665
6.18k
        }
666
667
14
        use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
668
14
                 cache_lock);
669
712k
    } else { /// block_it <-- segmment{k}
670
712k
        if (block_it != file_blocks.begin()) {
671
218
            const auto& prev_cell = std::prev(block_it)->second;
672
218
            const auto& prev_cell_range = prev_cell.file_block->range();
673
674
218
            if (range.left <= prev_cell_range.right) {
675
                ///   block{k-1}  block{k}
676
                ///   [________]   [_____
677
                ///       [___________
678
                ///       ^
679
                ///       range.left
680
681
137
                use_cell(prev_cell, &result,
682
137
                         need_to_move(prev_cell.file_block->cache_type(), context.cache_type),
683
137
                         cache_lock);
684
137
            }
685
218
        }
686
687
        ///  block{k} ...       block{k-1}  block{k}                      block{k}
688
        ///  [______              [______]     [____                        [________
689
        ///  [_________     OR              [________      OR    [______]   ^
690
        ///  ^                              ^                           ^   block{k}.offset
691
        ///  range.left                     range.left                  range.right
692
693
1.42M
        while (block_it != file_blocks.end()) {
694
712k
            const auto& cell = block_it->second;
695
712k
            if (range.right < cell.file_block->range().left) {
696
174
                break;
697
174
            }
698
699
712k
            use_cell(cell, &result, need_to_move(cell.file_block->cache_type(), context.cache_type),
700
712k
                     cache_lock);
701
712k
            ++block_it;
702
712k
        }
703
712k
    }
704
705
712k
    return result;
706
718k
}
707
708
193k
void BlockFileCache::add_need_update_lru_block(FileBlockSPtr block) {
709
193k
    int64_t queue_limit = config::file_cache_background_block_lru_update_queue_max_size;
710
193k
    size_t max_queue_size = queue_limit <= 0 ? 0 : static_cast<size_t>(queue_limit);
711
193k
    if (_need_update_lru_blocks.insert(std::move(block), max_queue_size)) {
712
1.55k
        *_need_update_lru_blocks_produce_metrics << 1;
713
1.55k
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
714
1.55k
    }
715
193k
}
716
717
5
std::string BlockFileCache::clear_file_cache_async() {
718
5
    LOG(INFO) << "start clear_file_cache_async, path=" << _cache_base_path;
719
5
    _lru_dumper->remove_lru_dump_files();
720
5
    int64_t num_cells_all = 0;
721
5
    int64_t num_cells_to_delete = 0;
722
5
    int64_t num_cells_wait_recycle = 0;
723
5
    int64_t num_files_all = 0;
724
5
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async");
725
5
    {
726
5
        SCOPED_CACHE_LOCK(_mutex, this);
727
728
5
        std::vector<FileBlockCell*> deleting_cells;
729
6
        for (auto& [_, offset_to_cell] : _files) {
730
6
            ++num_files_all;
731
39
            for (auto& [_1, cell] : offset_to_cell) {
732
39
                ++num_cells_all;
733
39
                deleting_cells.push_back(&cell);
734
39
            }
735
6
        }
736
737
        // we cannot delete the element in the loop above, because it will break the iterator
738
39
        for (auto& cell : deleting_cells) {
739
39
            if (!cell->releasable()) {
740
2
                LOG(INFO) << "cell is not releasable, hash="
741
2
                          << " offset=" << cell->file_block->offset();
742
2
                cell->file_block->set_deleting();
743
2
                ++num_cells_wait_recycle;
744
2
                continue;
745
2
            }
746
37
            FileBlockSPtr file_block = cell->file_block;
747
37
            if (file_block) {
748
37
                std::lock_guard block_lock(file_block->_mutex);
749
37
                remove(file_block, cache_lock, block_lock, false);
750
37
                ++num_cells_to_delete;
751
37
            }
752
37
        }
753
5
        clear_need_update_lru_blocks();
754
5
    }
755
756
5
    std::stringstream ss;
757
5
    ss << "finish clear_file_cache_async, path=" << _cache_base_path
758
5
       << " num_files_all=" << num_files_all << " num_cells_all=" << num_cells_all
759
5
       << " num_cells_to_delete=" << num_cells_to_delete
760
5
       << " num_cells_wait_recycle=" << num_cells_wait_recycle;
761
5
    auto msg = ss.str();
762
5
    LOG(INFO) << msg;
763
5
    _lru_dumper->remove_lru_dump_files();
764
5
    return msg;
765
5
}
766
767
FileBlocks BlockFileCache::split_range_into_cells(const UInt128Wrapper& hash,
768
                                                  const CacheContext& context, size_t offset,
769
                                                  size_t size, FileBlock::State state,
770
13.4k
                                                  std::lock_guard<std::mutex>& cache_lock) {
771
13.4k
    DCHECK(size > 0);
772
773
13.4k
    auto current_pos = offset;
774
13.4k
    auto end_pos_non_included = offset + size;
775
776
13.4k
    size_t current_size = 0;
777
13.4k
    size_t remaining_size = size;
778
779
13.4k
    FileBlocks file_blocks;
780
26.9k
    while (current_pos < end_pos_non_included) {
781
13.4k
        current_size = std::min(remaining_size, _max_file_block_size);
782
13.4k
        remaining_size -= current_size;
783
13.4k
        state = try_reserve(hash, context, current_pos, current_size, cache_lock)
784
13.4k
                        ? state
785
13.4k
                        : FileBlock::State::SKIP_CACHE;
786
13.4k
        if (state == FileBlock::State::SKIP_CACHE) [[unlikely]] {
787
62
            FileCacheKey key;
788
62
            key.hash = hash;
789
62
            key.offset = current_pos;
790
62
            key.meta.type = context.cache_type;
791
62
            key.meta.expiration_time = context.expiration_time;
792
62
            key.meta.tablet_id = context.tablet_id;
793
62
            auto file_block = std::make_shared<FileBlock>(key, current_size, this,
794
62
                                                          FileBlock::State::SKIP_CACHE);
795
62
            file_blocks.push_back(std::move(file_block));
796
13.4k
        } else {
797
13.4k
            auto* cell = add_cell(hash, context, current_pos, current_size, state, cache_lock);
798
13.4k
            if (cell) {
799
13.4k
                file_blocks.push_back(cell->file_block);
800
13.4k
                if (!context.is_cold_data) {
801
13.4k
                    cell->update_atime();
802
13.4k
                }
803
13.4k
            }
804
13.4k
            if (_ttl_mgr && context.tablet_id != 0) {
805
1.06k
                _ttl_mgr->register_tablet_id(context.tablet_id);
806
1.06k
            }
807
13.4k
        }
808
809
13.4k
        current_pos += current_size;
810
13.4k
    }
811
812
13.4k
    DCHECK(file_blocks.empty() || offset + size - 1 == file_blocks.back()->range().right);
813
13.4k
    return file_blocks;
814
13.4k
}
815
816
void BlockFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks,
817
                                                       const UInt128Wrapper& hash,
818
                                                       const CacheContext& context,
819
                                                       const FileBlock::Range& range,
820
712k
                                                       std::lock_guard<std::mutex>& cache_lock) {
821
    /// There are blocks [block1, ..., blockN]
822
    /// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
823
    /// intersect with given range.
824
825
    /// It can have holes:
826
    /// [____________________]         -- requested range
827
    ///     [____]  [_]   [_________]  -- intersecting cache [block1, ..., blockN]
828
    ///
829
    /// For each such hole create a cell with file block state EMPTY.
830
831
712k
    auto it = file_blocks.begin();
832
712k
    auto block_range = (*it)->range();
833
834
712k
    size_t current_pos = 0;
835
712k
    if (block_range.left < range.left) {
836
        ///    [_______     -- requested range
837
        /// [_______
838
        /// ^
839
        /// block1
840
841
151
        current_pos = block_range.right + 1;
842
151
        ++it;
843
712k
    } else {
844
712k
        current_pos = range.left;
845
712k
    }
846
847
1.42M
    while (current_pos <= range.right && it != file_blocks.end()) {
848
712k
        block_range = (*it)->range();
849
850
712k
        if (current_pos == block_range.left) {
851
712k
            current_pos = block_range.right + 1;
852
712k
            ++it;
853
712k
            continue;
854
712k
        }
855
856
712k
        DCHECK(current_pos < block_range.left);
857
858
109
        auto hole_size = block_range.left - current_pos;
859
860
109
        file_blocks.splice(it, split_range_into_cells(hash, context, current_pos, hole_size,
861
109
                                                      FileBlock::State::EMPTY, cache_lock));
862
863
109
        current_pos = block_range.right + 1;
864
109
        ++it;
865
109
    }
866
867
712k
    if (current_pos <= range.right) {
868
        ///   ________]     -- requested range
869
        ///   _____]
870
        ///        ^
871
        /// blockN
872
873
86
        auto hole_size = range.right - current_pos + 1;
874
875
86
        file_blocks.splice(file_blocks.end(),
876
86
                           split_range_into_cells(hash, context, current_pos, hole_size,
877
86
                                                  FileBlock::State::EMPTY, cache_lock));
878
86
    }
879
712k
}
880
881
FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t offset, size_t size,
882
725k
                                            CacheContext& context) {
883
725k
    FileBlock::Range range(offset, offset + size - 1);
884
885
725k
    ReadStatistics* stats = context.stats;
886
725k
    DCHECK(stats != nullptr);
887
725k
    MonotonicStopWatch sw;
888
725k
    sw.start();
889
725k
    FileBlocks file_blocks;
890
725k
    std::vector<FileBlockSPtr> need_update_lru_blocks;
891
725k
    const bool async_touch_on_get_or_set = config::enable_file_cache_async_touch_on_get_or_set;
892
725k
    int64_t duration = 0;
893
725k
    {
894
725k
        ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->increment();
895
725k
        std::lock_guard cache_lock(_mutex);
896
725k
        ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set_wait_lock->decrement();
897
725k
        stats->lock_wait_timer += sw.elapsed_time();
898
725k
        SCOPED_RAW_TIMER(&duration);
899
        /// Get all blocks which intersect with the given range.
900
725k
        {
901
725k
            SCOPED_RAW_TIMER(&stats->get_timer);
902
725k
            file_blocks = get_impl(hash, context, range, cache_lock);
903
725k
        }
904
905
725k
        if (file_blocks.empty()) {
906
13.2k
            SCOPED_RAW_TIMER(&stats->set_timer);
907
13.2k
            file_blocks = split_range_into_cells(hash, context, offset, size,
908
13.2k
                                                 FileBlock::State::EMPTY, cache_lock);
909
712k
        } else {
910
712k
            SCOPED_RAW_TIMER(&stats->set_timer);
911
712k
            fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock);
912
712k
        }
913
725k
        DCHECK(!file_blocks.empty());
914
725k
        *_num_read_blocks << file_blocks.size();
915
725k
        if (!context.is_warmup) {
916
725k
            *_no_warmup_num_read_blocks << file_blocks.size();
917
725k
        }
918
725k
        if (async_touch_on_get_or_set) {
919
193k
            need_update_lru_blocks.reserve(file_blocks.size());
920
193k
        }
921
726k
        for (auto& block : file_blocks) {
922
726k
            size_t block_size = block->range().size();
923
726k
            *_total_read_size_metrics << block_size;
924
726k
            if (block->state_unsafe() == FileBlock::State::DOWNLOADED) {
925
712k
                *_num_hit_blocks << 1;
926
712k
                *_total_hit_size_metrics << block_size;
927
712k
                if (!context.is_warmup) {
928
712k
                    *_no_warmup_num_hit_blocks << 1;
929
712k
                }
930
712k
                if (async_touch_on_get_or_set &&
931
712k
                    need_to_move(block->cache_type(), context.cache_type)) {
932
192k
                    need_update_lru_blocks.emplace_back(block);
933
192k
                }
934
712k
            }
935
726k
        }
936
725k
    }
937
938
725k
    if (async_touch_on_get_or_set) {
939
193k
        for (auto& block : need_update_lru_blocks) {
940
191k
            add_need_update_lru_block(std::move(block));
941
191k
        }
942
193k
    }
943
944
725k
    *_get_or_set_latency_us << (duration / 1000);
945
725k
    return FileBlocksHolder(std::move(file_blocks));
946
725k
}
947
948
FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& hash, const CacheContext& context,
949
                                        size_t offset, size_t size, FileBlock::State state,
950
13.6k
                                        std::lock_guard<std::mutex>& cache_lock) {
951
    /// Create a file block cell and put it in `files` map by [hash][offset].
952
13.6k
    if (size == 0) {
953
0
        return nullptr; /// Empty files are not cached.
954
0
    }
955
956
13.6k
    VLOG_DEBUG << "Adding file block to cache. size=" << size << " hash=" << hash.to_string()
957
0
               << " offset=" << offset << " cache_type=" << cache_type_to_string(context.cache_type)
958
0
               << " expiration_time=" << context.expiration_time
959
0
               << " tablet_id=" << context.tablet_id;
960
961
13.6k
    if (size > 1024 * 1024 * 1024) {
962
1
        LOG(WARNING) << "File block size is too large for a block, reject. size=" << size
963
1
                     << " hash=" << hash.to_string() << " offset=" << offset
964
1
                     << " stack:" << get_stack_trace();
965
1
        return nullptr;
966
1
    }
967
968
13.6k
    auto& offsets = _files[hash];
969
13.6k
    auto itr = offsets.find(offset);
970
13.6k
    if (itr != offsets.end()) {
971
5
        VLOG_DEBUG << "Cache already exists for hash: " << hash.to_string()
972
0
                   << ", offset: " << offset << ", size: " << size
973
0
                   << ".\nCurrent cache structure: " << dump_structure_unlocked(hash, cache_lock);
974
5
        return &(itr->second);
975
5
    }
976
977
13.6k
    FileCacheKey key;
978
13.6k
    key.hash = hash;
979
13.6k
    key.offset = offset;
980
13.6k
    key.meta.type = context.cache_type;
981
13.6k
    key.meta.expiration_time = context.expiration_time;
982
13.6k
    key.meta.tablet_id = context.tablet_id;
983
13.6k
    FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state), cache_lock);
984
13.6k
    Status st;
985
13.6k
    if (context.expiration_time == 0 && context.cache_type == FileCacheType::TTL) {
986
0
        st = cell.file_block->change_cache_type_lock(FileCacheType::NORMAL, cache_lock);
987
13.6k
    } else if (context.cache_type != FileCacheType::TTL && context.expiration_time != 0) {
988
1
        st = cell.file_block->change_cache_type_lock(FileCacheType::TTL, cache_lock);
989
1
    }
990
13.6k
    if (!st.ok()) {
991
0
        LOG(WARNING) << "Cannot change cache type. expiration_time=" << context.expiration_time
992
0
                     << " cache_type=" << cache_type_to_string(context.cache_type)
993
0
                     << " error=" << st.msg();
994
0
    }
995
996
13.6k
    auto& queue = get_queue(cell.file_block->cache_type());
997
13.6k
    cell.queue_iterator = queue.add(hash, offset, size, cache_lock);
998
13.6k
    _lru_recorder->record_queue_event(cell.file_block->cache_type(), CacheLRULogType::ADD,
999
13.6k
                                      cell.file_block->get_hash_value(), cell.file_block->offset(),
1000
13.6k
                                      cell.size());
1001
1002
13.6k
    if (cell.file_block->cache_type() == FileCacheType::TTL) {
1003
3.92k
        _cur_ttl_size += cell.size();
1004
3.92k
    }
1005
13.6k
    auto [it, _] = offsets.insert(std::make_pair(offset, std::move(cell)));
1006
13.6k
    _cur_cache_size += size;
1007
13.6k
    return &(it->second);
1008
13.6k
}
1009
1010
0
size_t BlockFileCache::try_release() {
1011
0
    SCOPED_CACHE_LOCK(_mutex, this);
1012
0
    std::vector<FileBlockCell*> trash;
1013
0
    for (auto& [hash, blocks] : _files) {
1014
0
        for (auto& [offset, cell] : blocks) {
1015
0
            if (cell.releasable()) {
1016
0
                trash.emplace_back(&cell);
1017
0
            } else {
1018
0
                cell.file_block->set_deleting();
1019
0
            }
1020
0
        }
1021
0
    }
1022
0
    size_t remove_size = 0;
1023
0
    for (auto& cell : trash) {
1024
0
        FileBlockSPtr file_block = cell->file_block;
1025
0
        std::lock_guard lc(cell->file_block->_mutex);
1026
0
        remove_size += file_block->range().size();
1027
0
        remove(file_block, cache_lock, lc);
1028
0
        VLOG_DEBUG << "try_release " << _cache_base_path
1029
0
                   << " hash=" << file_block->get_hash_value().to_string()
1030
0
                   << " offset=" << file_block->offset();
1031
0
    }
1032
0
    *_evict_by_try_release << remove_size;
1033
0
    LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << _cache_base_path;
1034
0
    return trash.size();
1035
0
}
1036
1037
764k
LRUQueue& BlockFileCache::get_queue(FileCacheType type) {
1038
764k
    switch (type) {
1039
14.5k
    case FileCacheType::INDEX:
1040
14.5k
        return _index_queue;
1041
14.3k
    case FileCacheType::DISPOSABLE:
1042
14.3k
        return _disposable_queue;
1043
729k
    case FileCacheType::NORMAL:
1044
729k
        return _normal_queue;
1045
5.79k
    case FileCacheType::TTL:
1046
5.79k
        return _ttl_queue;
1047
0
    default:
1048
0
        DCHECK(false);
1049
764k
    }
1050
0
    return _normal_queue;
1051
764k
}
1052
1053
140
const LRUQueue& BlockFileCache::get_queue(FileCacheType type) const {
1054
140
    switch (type) {
1055
33
    case FileCacheType::INDEX:
1056
33
        return _index_queue;
1057
1
    case FileCacheType::DISPOSABLE:
1058
1
        return _disposable_queue;
1059
106
    case FileCacheType::NORMAL:
1060
106
        return _normal_queue;
1061
0
    case FileCacheType::TTL:
1062
0
        return _ttl_queue;
1063
0
    default:
1064
0
        DCHECK(false);
1065
140
    }
1066
0
    return _normal_queue;
1067
140
}
1068
1069
void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
1070
                                        std::lock_guard<std::mutex>& cache_lock, bool sync,
1071
14.7k
                                        std::string& reason) {
1072
14.7k
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1073
1.35k
        FileBlockSPtr file_block = cell->file_block;
1074
1.35k
        if (file_block) {
1075
1.35k
            std::lock_guard block_lock(file_block->_mutex);
1076
1.35k
            remove(file_block, cache_lock, block_lock, sync);
1077
1.35k
            VLOG_DEBUG << "remove_file_blocks"
1078
0
                       << " hash=" << file_block->get_hash_value().to_string()
1079
0
                       << " offset=" << file_block->offset() << " reason=" << reason;
1080
1.35k
        }
1081
1.35k
    };
1082
14.7k
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1083
14.7k
}
1084
1085
void BlockFileCache::find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size,
1086
                                           size_t& removed_size,
1087
                                           std::vector<FileBlockCell*>& to_evict,
1088
                                           std::lock_guard<std::mutex>& cache_lock,
1089
1.34k
                                           size_t& cur_removed_size, bool evict_in_advance) {
1090
3.09k
    for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1091
3.09k
        if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1092
1.28k
            break;
1093
1.28k
        }
1094
1.81k
        auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1095
1096
1.81k
        DCHECK(cell) << "Cache became inconsistent. key: " << entry_key.to_string()
1097
0
                     << ", offset: " << entry_offset;
1098
1099
1.81k
        size_t cell_size = cell->size();
1100
1.81k
        DCHECK(entry_size == cell_size);
1101
1102
1.81k
        if (cell->releasable()) {
1103
1.34k
            auto& file_block = cell->file_block;
1104
1105
1.34k
            std::lock_guard block_lock(file_block->_mutex);
1106
1.34k
            DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1107
1.34k
            to_evict.push_back(cell);
1108
1.34k
            removed_size += cell_size;
1109
1.34k
            cur_removed_size += cell_size;
1110
1.34k
        }
1111
1.81k
    }
1112
1.34k
}
1113
1114
// 1. if async load file cache not finish
1115
//     a. evict from lru queue
1116
// 2. if ttl cache
1117
//     a. evict from disposable/normal/index queue one by one
1118
// 3. if dont reach query limit or dont have query limit
1119
//     a. evict from other queue
1120
//     b. evict from current queue
1121
//         a.1 if the data belong write, then just evict cold data
1122
// 4. if reach query limit
1123
//     a. evict from query queue
1124
//     b. evict from other queue
1125
bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext& context,
1126
                                 size_t offset, size_t size,
1127
13.4k
                                 std::lock_guard<std::mutex>& cache_lock) {
1128
13.4k
    if (!_async_open_done) {
1129
4
        return try_reserve_during_async_load(size, cache_lock);
1130
4
    }
1131
    // use this strategy in scenarios where there is insufficient disk capacity or insufficient number of inodes remaining
1132
    // directly eliminate 5 times the size of the space
1133
13.4k
    if (_disk_resource_limit_mode) {
1134
1
        size = 5 * size;
1135
1
    }
1136
1137
13.4k
    auto query_context = config::enable_file_cache_query_limit &&
1138
13.4k
                                         (context.query_id.hi != 0 || context.query_id.lo != 0)
1139
13.4k
                                 ? get_query_context(context.query_id, cache_lock)
1140
13.4k
                                 : nullptr;
1141
13.4k
    if (!query_context) {
1142
13.4k
        return try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock);
1143
13.4k
    } else if (query_context->get_cache_size(cache_lock) + size <=
1144
31
               query_context->get_max_cache_size()) {
1145
16
        return try_reserve_for_lru(hash, query_context, context, offset, size, cache_lock);
1146
16
    }
1147
15
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1148
15
                               std::chrono::steady_clock::now().time_since_epoch())
1149
15
                               .count();
1150
15
    auto& queue = get_queue(context.cache_type);
1151
15
    size_t removed_size = 0;
1152
15
    size_t ghost_remove_size = 0;
1153
15
    size_t queue_size = queue.get_capacity(cache_lock);
1154
15
    size_t cur_cache_size = _cur_cache_size;
1155
15
    size_t query_context_cache_size = query_context->get_cache_size(cache_lock);
1156
1157
15
    std::vector<LRUQueue::Iterator> ghost;
1158
15
    std::vector<FileBlockCell*> to_evict;
1159
1160
15
    size_t max_size = queue.get_max_size();
1161
48
    auto is_overflow = [&] {
1162
48
        return _disk_resource_limit_mode ? removed_size < size
1163
48
                                         : cur_cache_size + size - removed_size > _capacity ||
1164
48
                                                   (queue_size + size - removed_size > max_size) ||
1165
48
                                                   (query_context_cache_size + size -
1166
38
                                                            (removed_size + ghost_remove_size) >
1167
38
                                                    query_context->get_max_cache_size());
1168
48
    };
1169
1170
    /// Select the cache from the LRU queue held by query for expulsion.
1171
35
    for (auto iter = query_context->queue().begin(); iter != query_context->queue().end(); iter++) {
1172
33
        if (!is_overflow()) {
1173
13
            break;
1174
13
        }
1175
1176
20
        auto* cell = get_cell(iter->hash, iter->offset, cache_lock);
1177
1178
20
        if (!cell) {
1179
            /// The cache corresponding to this record may be swapped out by
1180
            /// other queries, so it has become invalid.
1181
4
            ghost.push_back(iter);
1182
4
            ghost_remove_size += iter->size;
1183
16
        } else {
1184
16
            size_t cell_size = cell->size();
1185
16
            DCHECK(iter->size == cell_size);
1186
1187
16
            if (cell->releasable()) {
1188
16
                auto& file_block = cell->file_block;
1189
1190
16
                std::lock_guard block_lock(file_block->_mutex);
1191
16
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1192
16
                to_evict.push_back(cell);
1193
16
                removed_size += cell_size;
1194
16
            }
1195
16
        }
1196
20
    }
1197
1198
16
    auto remove_file_block_if = [&](FileBlockCell* cell) {
1199
16
        FileBlockSPtr file_block = cell->file_block;
1200
16
        if (file_block) {
1201
16
            query_context->remove(file_block->get_hash_value(), file_block->offset(), cache_lock);
1202
16
            std::lock_guard block_lock(file_block->_mutex);
1203
16
            remove(file_block, cache_lock, block_lock);
1204
16
        }
1205
16
    };
1206
1207
15
    for (auto& iter : ghost) {
1208
4
        query_context->remove(iter->hash, iter->offset, cache_lock);
1209
4
    }
1210
1211
15
    std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
1212
1213
15
    if (is_overflow() &&
1214
15
        !try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock)) {
1215
1
        return false;
1216
1
    }
1217
14
    query_context->reserve(hash, offset, size, cache_lock);
1218
14
    return true;
1219
15
}
1220
1221
1
void BlockFileCache::try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock) {
1222
1
    UInt128Wrapper hash = UInt128Wrapper();
1223
1
    size_t offset = 0;
1224
1
    CacheContext context;
1225
    /* we pick NORMAL and TTL cache to evict in advance
1226
     * we reserve for them but won't acutually give space to them
1227
     * on the contrary, NORMAL and TTL may sacrifice by LRU evicting themselves
1228
     * other cache types cannot be exempted because we will evict what they have stolen before LRU evicting
1229
     * in summary: all cache types will shrink somewhat, and NORMAL and TTL shrink the most, to make sure the cache is not full
1230
     */
1231
1
    context.cache_type = FileCacheType::NORMAL;
1232
1
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1233
1
    context.cache_type = FileCacheType::TTL;
1234
1
    try_reserve_for_lru(hash, nullptr, context, offset, size, cache_lock, true);
1235
1
}
1236
1237
// remove specific cache synchronously, for critical operations
1238
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1239
8
void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
1240
8
    std::string reason = "remove_if_cached";
1241
8
    SCOPED_CACHE_LOCK(_mutex, this);
1242
8
    auto iter = _files.find(file_key);
1243
8
    std::vector<FileBlockCell*> to_remove;
1244
8
    if (iter != _files.end()) {
1245
14
        for (auto& [_, cell] : iter->second) {
1246
14
            if (cell.releasable()) {
1247
13
                to_remove.push_back(&cell);
1248
13
            } else {
1249
1
                cell.file_block->set_deleting();
1250
1
            }
1251
14
        }
1252
6
    }
1253
8
    remove_file_blocks(to_remove, cache_lock, true, reason);
1254
8
}
1255
1256
// the async version of remove_if_cached, for background operations
1257
// cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously
1258
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
1259
2
void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
1260
2
    std::string reason = "remove_if_cached_async";
1261
2
    SCOPED_CACHE_LOCK(_mutex, this);
1262
1263
2
    auto iter = _files.find(file_key);
1264
2
    std::vector<FileBlockCell*> to_remove;
1265
2
    if (iter != _files.end()) {
1266
2
        for (auto& [_, cell] : iter->second) {
1267
2
            *_gc_evict_bytes_metrics << cell.size();
1268
2
            *_gc_evict_count_metrics << 1;
1269
2
            if (cell.releasable()) {
1270
1
                to_remove.push_back(&cell);
1271
1
            } else {
1272
1
                cell.file_block->set_deleting();
1273
1
            }
1274
2
        }
1275
2
    }
1276
2
    remove_file_blocks(to_remove, cache_lock, false, reason);
1277
2
}
1278
1279
std::vector<FileCacheType> BlockFileCache::get_other_cache_type_without_ttl(
1280
13.4k
        FileCacheType cur_cache_type) {
1281
13.4k
    switch (cur_cache_type) {
1282
3.91k
    case FileCacheType::TTL:
1283
3.91k
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1284
671
    case FileCacheType::INDEX:
1285
671
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL};
1286
8.28k
    case FileCacheType::NORMAL:
1287
8.28k
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX};
1288
617
    case FileCacheType::DISPOSABLE:
1289
617
        return {FileCacheType::NORMAL, FileCacheType::INDEX};
1290
0
    default:
1291
0
        return {};
1292
13.4k
    }
1293
0
    return {};
1294
13.4k
}
1295
1296
1.25k
std::vector<FileCacheType> BlockFileCache::get_other_cache_type(FileCacheType cur_cache_type) {
1297
1.25k
    switch (cur_cache_type) {
1298
283
    case FileCacheType::TTL:
1299
283
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::INDEX};
1300
143
    case FileCacheType::INDEX:
1301
143
        return {FileCacheType::DISPOSABLE, FileCacheType::NORMAL, FileCacheType::TTL};
1302
673
    case FileCacheType::NORMAL:
1303
673
        return {FileCacheType::DISPOSABLE, FileCacheType::INDEX, FileCacheType::TTL};
1304
152
    case FileCacheType::DISPOSABLE:
1305
152
        return {FileCacheType::NORMAL, FileCacheType::INDEX, FileCacheType::TTL};
1306
0
    default:
1307
0
        return {};
1308
1.25k
    }
1309
0
    return {};
1310
1.25k
}
1311
1312
void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size_t old_size,
1313
5
                                 size_t new_size, std::lock_guard<std::mutex>& cache_lock) {
1314
5
    DCHECK(_files.find(hash) != _files.end() &&
1315
5
           _files.find(hash)->second.find(offset) != _files.find(hash)->second.end());
1316
5
    FileBlockCell* cell = get_cell(hash, offset, cache_lock);
1317
5
    DCHECK(cell != nullptr);
1318
5
    if (cell == nullptr) {
1319
0
        LOG(WARNING) << "reset_range skipped because cache cell is missing. hash="
1320
0
                     << hash.to_string() << " offset=" << offset << " old_size=" << old_size
1321
0
                     << " new_size=" << new_size;
1322
0
        return;
1323
0
    }
1324
5
    DCHECK_EQ(cell->file_block->_block_range.size(), old_size);
1325
5
    if (cell->queue_iterator) {
1326
5
        auto& queue = get_queue(cell->file_block->cache_type());
1327
5
        DCHECK(queue.contains(hash, offset, cache_lock));
1328
5
        queue.resize(*cell->queue_iterator, new_size, cache_lock);
1329
5
        _lru_recorder->record_queue_event(cell->file_block->cache_type(), CacheLRULogType::RESIZE,
1330
5
                                          cell->file_block->get_hash_value(),
1331
5
                                          cell->file_block->offset(), new_size);
1332
5
    }
1333
5
    cell->file_block->_block_range.right = cell->file_block->_block_range.left + new_size - 1;
1334
5
    _cur_cache_size -= old_size;
1335
5
    _cur_cache_size += new_size;
1336
5
    if (cell->file_block->cache_type() == FileCacheType::TTL) {
1337
0
        _cur_ttl_size -= old_size;
1338
0
        _cur_ttl_size += new_size;
1339
0
    }
1340
5
}
1341
1342
bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
1343
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1344
13.4k
        int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1345
13.4k
    size_t removed_size = 0;
1346
13.4k
    size_t cur_cache_size = _cur_cache_size;
1347
13.4k
    std::vector<FileBlockCell*> to_evict;
1348
30.8k
    for (FileCacheType cache_type : other_cache_types) {
1349
30.8k
        auto& queue = get_queue(cache_type);
1350
30.8k
        size_t remove_size_per_type = 0;
1351
30.8k
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1352
1.43k
            if (!is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1353
697
                break;
1354
697
            }
1355
738
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1356
738
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
1357
0
                         << ", offset: " << entry_offset;
1358
1359
738
            size_t cell_size = cell->size();
1360
738
            DCHECK(entry_size == cell_size);
1361
1362
738
            if (cell->atime == 0 ? true : cell->atime + queue.get_hot_data_interval() > cur_time) {
1363
736
                break;
1364
736
            }
1365
1366
2
            if (cell->releasable()) {
1367
2
                auto& file_block = cell->file_block;
1368
2
                std::lock_guard block_lock(file_block->_mutex);
1369
2
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
1370
2
                to_evict.push_back(cell);
1371
2
                removed_size += cell_size;
1372
2
                remove_size_per_type += cell_size;
1373
2
            }
1374
2
        }
1375
30.8k
        *(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type;
1376
30.8k
    }
1377
13.4k
    bool is_sync_removal = !evict_in_advance;
1378
13.4k
    std::string reason = std::string("try_reserve_by_time ") +
1379
13.4k
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1380
13.4k
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1381
1382
13.4k
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1383
13.4k
}
1384
1385
bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size,
1386
19.2k
                                 bool evict_in_advance) const {
1387
19.2k
    bool ret = false;
1388
19.2k
    if (evict_in_advance) { // we don't need to check _need_evict_cache_in_advance
1389
11
        ret = (removed_size < need_size);
1390
11
        return ret;
1391
11
    }
1392
19.2k
    if (_disk_resource_limit_mode) {
1393
4
        ret = (removed_size < need_size);
1394
19.2k
    } else {
1395
19.2k
        ret = (cur_cache_size + need_size - removed_size > _capacity);
1396
19.2k
    }
1397
19.2k
    return ret;
1398
19.2k
}
1399
1400
bool BlockFileCache::try_reserve_from_other_queue_by_size(
1401
        FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1402
413
        std::lock_guard<std::mutex>& cache_lock, bool evict_in_advance) {
1403
413
    size_t removed_size = 0;
1404
413
    size_t cur_cache_size = _cur_cache_size;
1405
413
    std::vector<FileBlockCell*> to_evict;
1406
    // we follow the privilege defined in get_other_cache_types to evict
1407
1.23k
    for (FileCacheType cache_type : other_cache_types) {
1408
1.23k
        auto& queue = get_queue(cache_type);
1409
1410
        // we will not drain each of them to the bottom -- i.e., we only
1411
        // evict what they have stolen.
1412
1.23k
        size_t cur_queue_size = queue.get_capacity(cache_lock);
1413
1.23k
        size_t cur_queue_max_size = queue.get_max_size();
1414
1.23k
        if (cur_queue_size <= cur_queue_max_size) {
1415
735
            continue;
1416
735
        }
1417
504
        size_t cur_removed_size = 0;
1418
504
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1419
504
                              cur_removed_size, evict_in_advance);
1420
504
        *(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size;
1421
504
    }
1422
413
    bool is_sync_removal = !evict_in_advance;
1423
413
    std::string reason = std::string("try_reserve_by_size") +
1424
413
                         " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1425
413
    remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1426
413
    return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
1427
413
}
1428
1429
bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t size,
1430
                                                  int64_t cur_time,
1431
                                                  std::lock_guard<std::mutex>& cache_lock,
1432
13.4k
                                                  bool evict_in_advance) {
1433
    // currently, TTL cache is not considered as a candidate
1434
13.4k
    auto other_cache_types = get_other_cache_type_without_ttl(cur_cache_type);
1435
13.4k
    bool reserve_success = try_reserve_from_other_queue_by_time_interval(
1436
13.4k
            cur_cache_type, other_cache_types, size, cur_time, cache_lock, evict_in_advance);
1437
13.4k
    if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) {
1438
12.2k
        return reserve_success;
1439
12.2k
    }
1440
1441
1.25k
    other_cache_types = get_other_cache_type(cur_cache_type);
1442
1.25k
    auto& cur_queue = get_queue(cur_cache_type);
1443
1.25k
    size_t cur_queue_size = cur_queue.get_capacity(cache_lock);
1444
1.25k
    size_t cur_queue_max_size = cur_queue.get_max_size();
1445
    // Hit the soft limit by self, cannot remove from other queues
1446
1.25k
    if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) {
1447
838
        return false;
1448
838
    }
1449
413
    return try_reserve_from_other_queue_by_size(cur_cache_type, other_cache_types, size, cache_lock,
1450
413
                                                evict_in_advance);
1451
1.25k
}
1452
1453
bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
1454
                                         QueryFileCacheContextPtr query_context,
1455
                                         const CacheContext& context, size_t offset, size_t size,
1456
                                         std::lock_guard<std::mutex>& cache_lock,
1457
13.4k
                                         bool evict_in_advance) {
1458
13.4k
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1459
13.4k
                               std::chrono::steady_clock::now().time_since_epoch())
1460
13.4k
                               .count();
1461
13.4k
    if (!try_reserve_from_other_queue(context.cache_type, size, cur_time, cache_lock,
1462
13.4k
                                      evict_in_advance)) {
1463
839
        auto& queue = get_queue(context.cache_type);
1464
839
        size_t removed_size = 0;
1465
839
        size_t cur_cache_size = _cur_cache_size;
1466
1467
839
        std::vector<FileBlockCell*> to_evict;
1468
839
        size_t cur_removed_size = 0;
1469
839
        find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1470
839
                              cur_removed_size, evict_in_advance);
1471
839
        bool is_sync_removal = !evict_in_advance;
1472
839
        std::string reason = std::string("try_reserve for cache type ") +
1473
839
                             cache_type_to_string(context.cache_type) +
1474
839
                             " evict_in_advance=" + (evict_in_advance ? "true" : "false");
1475
839
        remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
1476
839
        *(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size;
1477
1478
839
        if (is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
1479
61
            return false;
1480
61
        }
1481
839
    }
1482
1483
13.4k
    if (query_context) {
1484
16
        query_context->reserve(hash, offset, size, cache_lock);
1485
16
    }
1486
13.4k
    return true;
1487
13.4k
}
1488
1489
template <class T, class U>
1490
    requires IsXLock<T> && IsXLock<U>
1491
3.42k
void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lock, bool sync) {
1492
3.42k
    auto hash = file_block->get_hash_value();
1493
3.42k
    auto offset = file_block->offset();
1494
3.42k
    auto type = file_block->cache_type();
1495
3.42k
    auto expiration_time = file_block->expiration_time();
1496
3.42k
    auto tablet_id = file_block->tablet_id();
1497
3.42k
    auto* cell = get_cell(hash, offset, cache_lock);
1498
3.42k
    file_block->cell = nullptr;
1499
    // Holder cleanup can race with prior cache metadata cleanup. In that case,
1500
    // skip the duplicate remove instead of touching a detached or replaced cell.
1501
3.42k
    if (cell == nullptr) {
1502
1
        LOG(WARNING) << "remove skipped because cache cell is missing. hash=" << hash.to_string()
1503
1
                     << " offset=" << offset << " size=" << file_block->range().size()
1504
1
                     << " type=" << cache_type_to_string(type)
1505
1
                     << " state=" << FileBlock::state_to_string(file_block->state_unsafe())
1506
1
                     << " expiration_time=" << expiration_time << " sync=" << sync;
1507
1
        return;
1508
1
    }
1509
3.41k
    if (cell->file_block.get() != file_block.get()) {
1510
1
        auto* cell_file_block = cell->file_block.get();
1511
1
        LOG(WARNING)
1512
1
                << "remove skipped because cache cell points to a different file block. hash="
1513
1
                << hash.to_string() << " offset=" << offset
1514
1
                << " size=" << file_block->range().size() << " type=" << cache_type_to_string(type)
1515
1
                << " state=" << FileBlock::state_to_string(file_block->state_unsafe())
1516
1
                << " expiration_time=" << expiration_time << " sync=" << sync << " cell_block_hash="
1517
1
                << (cell_file_block ? cell_file_block->get_hash_value().to_string() : "<null>")
1518
1
                << " cell_block_offset="
1519
1
                << (cell_file_block ? std::to_string(cell_file_block->offset()) : "<null>")
1520
1
                << " cell_block_size="
1521
1
                << (cell_file_block ? std::to_string(cell_file_block->range().size()) : "<null>")
1522
1
                << " cell_block_type="
1523
1
                << (cell_file_block ? cache_type_to_string(cell_file_block->cache_type())
1524
1
                                    : "<null>")
1525
1
                << " cell_block_state="
1526
1
                << (cell_file_block ? FileBlock::state_to_string(cell_file_block->state_unsafe())
1527
1
                                    : "<null>");
1528
1
        return;
1529
1
    }
1530
3.41k
    DCHECK(cell->queue_iterator);
1531
3.41k
    if (cell->queue_iterator) {
1532
3.41k
        auto& queue = get_queue(file_block->cache_type());
1533
3.41k
        queue.remove(*cell->queue_iterator, cache_lock);
1534
3.41k
        _lru_recorder->record_queue_event(file_block->cache_type(), CacheLRULogType::REMOVE,
1535
3.41k
                                          cell->file_block->get_hash_value(),
1536
3.41k
                                          cell->file_block->offset(), cell->size());
1537
3.41k
    }
1538
3.41k
    *_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())]
1539
3.41k
            << file_block->range().size();
1540
3.41k
    *_total_evict_size_metrics << file_block->range().size();
1541
1542
3.41k
    VLOG_DEBUG << "Removing file block from cache. hash: " << hash.to_string()
1543
0
               << ", offset: " << offset << ", size: " << file_block->range().size()
1544
0
               << ", type: " << cache_type_to_string(type);
1545
1546
3.41k
    if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) {
1547
1.42k
        FileCacheKey key;
1548
1.42k
        key.hash = hash;
1549
1.42k
        key.offset = offset;
1550
1.42k
        key.meta.type = type;
1551
1.42k
        key.meta.expiration_time = expiration_time;
1552
1.42k
        key.meta.tablet_id = tablet_id;
1553
1.42k
        if (sync) {
1554
1.38k
            int64_t duration_ns = 0;
1555
1.38k
            Status st;
1556
1.38k
            {
1557
1.38k
                SCOPED_RAW_TIMER(&duration_ns);
1558
1.38k
                st = _storage->remove(key);
1559
1.38k
            }
1560
1.38k
            *_storage_sync_remove_latency_us << (duration_ns / 1000);
1561
1.38k
            if (!st.ok()) {
1562
0
                LOG_WARNING("").error(st);
1563
0
            }
1564
1.38k
        } else {
1565
            // the file will be deleted in the bottom half
1566
            // so there will be a window that the file is not in the cache but still in the storage
1567
            // but it's ok, because the rowset is stale already
1568
46
            bool ret = _recycle_keys.enqueue(key);
1569
46
            if (ret) [[likely]] {
1570
46
                *_recycle_keys_length_recorder << _recycle_keys.size_approx();
1571
46
            } else {
1572
0
                LOG_WARNING("Failed to push recycle key to queue, do it synchronously");
1573
0
                int64_t duration_ns = 0;
1574
0
                Status st;
1575
0
                {
1576
0
                    SCOPED_RAW_TIMER(&duration_ns);
1577
0
                    st = _storage->remove(key);
1578
0
                }
1579
0
                *_storage_retry_sync_remove_latency_us << (duration_ns / 1000);
1580
0
                if (!st.ok()) {
1581
0
                    LOG_WARNING("").error(st);
1582
0
                }
1583
0
            }
1584
46
        }
1585
1.99k
    } else if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADING) {
1586
0
        file_block->set_deleting();
1587
0
        return;
1588
0
    }
1589
3.41k
    _cur_cache_size -= file_block->range().size();
1590
3.41k
    if (FileCacheType::TTL == type) {
1591
1.29k
        _cur_ttl_size -= file_block->range().size();
1592
1.29k
    }
1593
3.41k
    auto it = _files.find(hash);
1594
3.41k
    if (it != _files.end()) {
1595
3.41k
        it->second.erase(file_block->offset());
1596
3.41k
        if (it->second.empty()) {
1597
1.34k
            _files.erase(hash);
1598
1.34k
        }
1599
3.41k
    }
1600
3.41k
    *_num_removed_blocks << 1;
1601
3.41k
}
1602
1603
46
size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const {
1604
46
    SCOPED_CACHE_LOCK(_mutex, this);
1605
46
    return get_used_cache_size_unlocked(cache_type, cache_lock);
1606
46
}
1607
1608
size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type,
1609
46
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1610
46
    return get_queue(cache_type).get_capacity(cache_lock);
1611
46
}
1612
1613
0
size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const {
1614
0
    SCOPED_CACHE_LOCK(_mutex, this);
1615
0
    return get_available_cache_size_unlocked(cache_type, cache_lock);
1616
0
}
1617
1618
size_t BlockFileCache::get_available_cache_size_unlocked(
1619
0
        FileCacheType cache_type, std::lock_guard<std::mutex>& cache_lock) const {
1620
0
    return get_queue(cache_type).get_max_element_size() -
1621
0
           get_used_cache_size_unlocked(cache_type, cache_lock);
1622
0
}
1623
1624
91
size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const {
1625
91
    SCOPED_CACHE_LOCK(_mutex, this);
1626
91
    return get_file_blocks_num_unlocked(cache_type, cache_lock);
1627
91
}
1628
1629
size_t BlockFileCache::get_file_blocks_num_unlocked(FileCacheType cache_type,
1630
91
                                                    std::lock_guard<std::mutex>& cache_lock) const {
1631
91
    return get_queue(cache_type).get_elements_num(cache_lock);
1632
91
}
1633
1634
FileBlockCell::FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock)
1635
13.6k
        : file_block(file_block) {
1636
13.6k
    file_block->cell = this;
1637
    /**
1638
     * Cell can be created with either DOWNLOADED or EMPTY file block's state.
1639
     * File block acquires DOWNLOADING state and creates LRUQueue iterator on first
1640
     * successful getOrSetDownaloder call.
1641
     */
1642
1643
13.6k
    switch (file_block->_download_state) {
1644
198
    case FileBlock::State::DOWNLOADED:
1645
13.6k
    case FileBlock::State::EMPTY:
1646
13.6k
    case FileBlock::State::SKIP_CACHE: {
1647
13.6k
        break;
1648
13.6k
    }
1649
0
    default:
1650
0
        DCHECK(false) << "Can create cell with either EMPTY, DOWNLOADED, SKIP_CACHE state, got: "
1651
0
                      << FileBlock::state_to_string(file_block->_download_state);
1652
13.6k
    }
1653
13.6k
    if (file_block->cache_type() == FileCacheType::TTL) {
1654
3.92k
        update_atime();
1655
3.92k
    }
1656
13.6k
}
1657
1658
LRUQueue::Iterator LRUQueue::add(const UInt128Wrapper& hash, size_t offset, size_t size,
1659
27.2k
                                 std::lock_guard<std::mutex>& /* cache_lock */) {
1660
27.2k
    cache_size += size;
1661
27.2k
    auto iter = queue.insert(queue.end(), FileKeyAndOffset(hash, offset, size));
1662
27.2k
    map.insert(std::make_pair(std::make_pair(hash, offset), iter));
1663
27.2k
    return iter;
1664
27.2k
}
1665
1666
0
void LRUQueue::remove_all(std::lock_guard<std::mutex>& /* cache_lock */) {
1667
0
    queue.clear();
1668
0
    map.clear();
1669
0
    cache_size = 0;
1670
0
}
1671
1672
1.03M
void LRUQueue::move_to_end(Iterator queue_it, std::lock_guard<std::mutex>& /* cache_lock */) {
1673
1.03M
    queue.splice(queue.end(), queue, queue_it);
1674
1.03M
}
1675
1676
void LRUQueue::resize(Iterator queue_it, size_t new_size,
1677
9
                      std::lock_guard<std::mutex>& /* cache_lock */) {
1678
9
    cache_size -= queue_it->size;
1679
9
    queue_it->size = new_size;
1680
9
    cache_size += new_size;
1681
9
}
1682
bool LRUQueue::contains(const UInt128Wrapper& hash, size_t offset,
1683
5
                        std::lock_guard<std::mutex>& /* cache_lock */) const {
1684
5
    return map.find(std::make_pair(hash, offset)) != map.end();
1685
5
}
1686
1687
LRUQueue::Iterator LRUQueue::get(const UInt128Wrapper& hash, size_t offset,
1688
524k
                                 std::lock_guard<std::mutex>& /* cache_lock */) const {
1689
524k
    auto itr = map.find(std::make_pair(hash, offset));
1690
524k
    if (itr != map.end()) {
1691
524k
        return itr->second;
1692
524k
    }
1693
113
    return std::list<FileKeyAndOffset>::iterator();
1694
524k
}
1695
1696
2
std::string LRUQueue::to_string(std::lock_guard<std::mutex>& /* cache_lock */) const {
1697
2
    std::string result;
1698
18
    for (const auto& [hash, offset, size] : queue) {
1699
18
        if (!result.empty()) {
1700
16
            result += ", ";
1701
16
        }
1702
18
        result += fmt::format("{}: [{}, {}]", hash.to_string(), offset, offset + size - 1);
1703
18
    }
1704
2
    return result;
1705
2
}
1706
1707
size_t LRUQueue::levenshtein_distance_from(LRUQueue& base,
1708
5
                                           std::lock_guard<std::mutex>& cache_lock) {
1709
5
    std::list<FileKeyAndOffset> target_queue = this->queue;
1710
5
    std::list<FileKeyAndOffset> base_queue = base.queue;
1711
5
    std::vector<FileKeyAndOffset> vec1(target_queue.begin(), target_queue.end());
1712
5
    std::vector<FileKeyAndOffset> vec2(base_queue.begin(), base_queue.end());
1713
1714
5
    size_t m = vec1.size();
1715
5
    size_t n = vec2.size();
1716
1717
    // Create a 2D vector (matrix) to store the Levenshtein distances
1718
    // dp[i][j] will hold the distance between the first i elements of vec1 and the first j elements of vec2
1719
5
    std::vector<std::vector<size_t>> dp(m + 1, std::vector<size_t>(n + 1, 0));
1720
1721
    // Initialize the first row and column of the matrix
1722
    // The distance between an empty list and a list of length k is k (all insertions or deletions)
1723
22
    for (size_t i = 0; i <= m; ++i) {
1724
17
        dp[i][0] = i;
1725
17
    }
1726
20
    for (size_t j = 0; j <= n; ++j) {
1727
15
        dp[0][j] = j;
1728
15
    }
1729
1730
    // Fill the matrix using dynamic programming
1731
17
    for (size_t i = 1; i <= m; ++i) {
1732
38
        for (size_t j = 1; j <= n; ++j) {
1733
            // Check if the current elements of both vectors are equal
1734
26
            size_t cost = (vec1[i - 1].hash == vec2[j - 1].hash &&
1735
26
                           vec1[i - 1].offset == vec2[j - 1].offset)
1736
26
                                  ? 0
1737
26
                                  : 1;
1738
            // Calculate the minimum cost of three possible operations:
1739
            // 1. Insertion: dp[i][j-1] + 1
1740
            // 2. Deletion: dp[i-1][j] + 1
1741
            // 3. Substitution: dp[i-1][j-1] + cost (0 if elements are equal, 1 if not)
1742
26
            dp[i][j] = std::min({dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i - 1][j - 1] + cost});
1743
26
        }
1744
12
    }
1745
    // The bottom-right cell of the matrix contains the Levenshtein distance
1746
5
    return dp[m][n];
1747
5
}
1748
1749
1
std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) {
1750
1
    SCOPED_CACHE_LOCK(_mutex, this);
1751
1
    return dump_structure_unlocked(hash, cache_lock);
1752
1
}
1753
1754
std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
1755
1
                                                    std::lock_guard<std::mutex>&) {
1756
1
    std::stringstream result;
1757
1
    auto it = _files.find(hash);
1758
1
    if (it == _files.end()) {
1759
0
        return std::string("");
1760
0
    }
1761
1
    const auto& cells_by_offset = it->second;
1762
1763
1
    for (const auto& [_, cell] : cells_by_offset) {
1764
1
        result << cell.file_block->get_info_for_log() << " "
1765
1
               << cache_type_to_string(cell.file_block->cache_type()) << "\n";
1766
1
    }
1767
1768
1
    return result.str();
1769
1
}
1770
1771
0
std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) {
1772
0
    SCOPED_CACHE_LOCK(_mutex, this);
1773
0
    return dump_single_cache_type_unlocked(hash, offset, cache_lock);
1774
0
}
1775
1776
std::string BlockFileCache::dump_single_cache_type_unlocked(const UInt128Wrapper& hash,
1777
                                                            size_t offset,
1778
0
                                                            std::lock_guard<std::mutex>&) {
1779
0
    std::stringstream result;
1780
0
    auto it = _files.find(hash);
1781
0
    if (it == _files.end()) {
1782
0
        return std::string("");
1783
0
    }
1784
0
    const auto& cells_by_offset = it->second;
1785
0
    const auto& cell = cells_by_offset.find(offset);
1786
1787
0
    return cache_type_to_string(cell->second.file_block->cache_type());
1788
0
}
1789
1790
void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t offset,
1791
                                       FileCacheType new_type,
1792
9
                                       std::lock_guard<std::mutex>& cache_lock) {
1793
9
    if (auto iter = _files.find(hash); iter != _files.end()) {
1794
9
        auto& file_blocks = iter->second;
1795
9
        if (auto cell_it = file_blocks.find(offset); cell_it != file_blocks.end()) {
1796
8
            FileBlockCell& cell = cell_it->second;
1797
8
            auto& cur_queue = get_queue(cell.file_block->cache_type());
1798
8
            DCHECK(cell.queue_iterator.has_value());
1799
8
            cur_queue.remove(*cell.queue_iterator, cache_lock);
1800
8
            _lru_recorder->record_queue_event(
1801
8
                    cell.file_block->cache_type(), CacheLRULogType::REMOVE,
1802
8
                    cell.file_block->get_hash_value(), cell.file_block->offset(), cell.size());
1803
8
            auto& new_queue = get_queue(new_type);
1804
8
            cell.queue_iterator =
1805
8
                    new_queue.add(hash, offset, cell.file_block->range().size(), cache_lock);
1806
8
            _lru_recorder->record_queue_event(new_type, CacheLRULogType::ADD,
1807
8
                                              cell.file_block->get_hash_value(),
1808
8
                                              cell.file_block->offset(), cell.size());
1809
8
        }
1810
9
    }
1811
9
}
1812
1813
// @brief: get a path's disk capacity used percent, inode used percent
1814
// @param: path
1815
// @param: percent.first disk used percent, percent.second inode used percent
1816
160
int disk_used_percentage(const std::string& path, std::pair<int, int>* percent) {
1817
160
    struct statfs stat;
1818
160
    int ret = statfs(path.c_str(), &stat);
1819
160
    if (ret != 0) {
1820
2
        return ret;
1821
2
    }
1822
    // https://github.com/coreutils/coreutils/blob/master/src/df.c#L1195
1823
    // v->used = stat.f_blocks - stat.f_bfree
1824
    // nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail
1825
158
    uintmax_t u100 = (stat.f_blocks - stat.f_bfree) * 100;
1826
158
    uintmax_t nonroot_total = stat.f_blocks - stat.f_bfree + stat.f_bavail;
1827
158
    int capacity_percentage = int(u100 / nonroot_total + (u100 % nonroot_total != 0));
1828
1829
158
    unsigned long long inode_free = stat.f_ffree;
1830
158
    unsigned long long inode_total = stat.f_files;
1831
158
    int inode_percentage = cast_set<int>(inode_free * 100 / inode_total);
1832
158
    percent->first = capacity_percentage;
1833
158
    percent->second = 100 - inode_percentage;
1834
1835
    // Add sync point for testing
1836
158
    TEST_SYNC_POINT_CALLBACK("BlockFileCache::disk_used_percentage:1", percent);
1837
1838
158
    return 0;
1839
160
}
1840
1841
10
std::string BlockFileCache::reset_capacity(size_t new_capacity) {
1842
10
    using namespace std::chrono;
1843
10
    int64_t space_released = 0;
1844
10
    size_t old_capacity = 0;
1845
10
    std::stringstream ss;
1846
10
    ss << "finish reset_capacity, path=" << _cache_base_path;
1847
10
    auto adjust_start_time = steady_clock::time_point();
1848
10
    {
1849
10
        SCOPED_CACHE_LOCK(_mutex, this);
1850
10
        if (new_capacity < _capacity && new_capacity < _cur_cache_size) {
1851
1
            int64_t need_remove_size = _cur_cache_size - new_capacity;
1852
4
            auto remove_blocks = [&](LRUQueue& queue) -> int64_t {
1853
4
                int64_t queue_released = 0;
1854
4
                std::vector<FileBlockCell*> to_evict;
1855
13
                for (const auto& [entry_key, entry_offset, entry_size] : queue) {
1856
13
                    if (need_remove_size <= 0) {
1857
1
                        break;
1858
1
                    }
1859
12
                    need_remove_size -= entry_size;
1860
12
                    space_released += entry_size;
1861
12
                    queue_released += entry_size;
1862
12
                    auto* cell = get_cell(entry_key, entry_offset, cache_lock);
1863
12
                    if (!cell->releasable()) {
1864
0
                        cell->file_block->set_deleting();
1865
0
                        continue;
1866
0
                    }
1867
12
                    to_evict.push_back(cell);
1868
12
                }
1869
12
                for (auto& cell : to_evict) {
1870
12
                    FileBlockSPtr file_block = cell->file_block;
1871
12
                    std::lock_guard block_lock(file_block->_mutex);
1872
12
                    remove(file_block, cache_lock, block_lock);
1873
12
                }
1874
4
                return queue_released;
1875
4
            };
1876
1
            int64_t queue_released = remove_blocks(_disposable_queue);
1877
1
            ss << " disposable_queue released " << queue_released;
1878
1
            queue_released = remove_blocks(_normal_queue);
1879
1
            ss << " normal_queue released " << queue_released;
1880
1
            queue_released = remove_blocks(_index_queue);
1881
1
            ss << " index_queue released " << queue_released;
1882
1
            queue_released = remove_blocks(_ttl_queue);
1883
1
            ss << " ttl_queue released " << queue_released;
1884
1885
1
            _disk_resource_limit_mode = true;
1886
1
            _disk_limit_mode_metrics->set_value(1);
1887
1
            ss << " total_space_released=" << space_released;
1888
1
        }
1889
10
        old_capacity = _capacity;
1890
10
        _capacity = new_capacity;
1891
10
        _cache_capacity_metrics->set_value(_capacity);
1892
10
    }
1893
10
    auto use_time = duration_cast<milliseconds>(steady_clock::time_point() - adjust_start_time);
1894
10
    LOG(INFO) << "Finish tag deleted block. path=" << _cache_base_path
1895
10
              << " use_time=" << cast_set<int64_t>(use_time.count());
1896
10
    ss << " old_capacity=" << old_capacity << " new_capacity=" << new_capacity;
1897
10
    LOG(INFO) << ss.str();
1898
10
    return ss.str();
1899
10
}
1900
1901
170
void BlockFileCache::check_disk_resource_limit() {
1902
170
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1903
24
        return;
1904
24
    }
1905
1906
146
    bool previous_mode = _disk_resource_limit_mode;
1907
146
    if (_capacity > _cur_cache_size) {
1908
144
        _disk_resource_limit_mode = false;
1909
144
        _disk_limit_mode_metrics->set_value(0);
1910
144
    }
1911
146
    std::pair<int, int> percent;
1912
146
    int ret = disk_used_percentage(_cache_base_path, &percent);
1913
146
    if (ret != 0) {
1914
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1915
1
        return;
1916
1
    }
1917
145
    auto [space_percentage, inode_percentage] = percent;
1918
290
    auto is_insufficient = [](const int& percentage) {
1919
290
        return percentage >= config::file_cache_enter_disk_resource_limit_mode_percent;
1920
290
    };
1921
145
    DCHECK_GE(space_percentage, 0);
1922
145
    DCHECK_LE(space_percentage, 100);
1923
145
    DCHECK_GE(inode_percentage, 0);
1924
145
    DCHECK_LE(inode_percentage, 100);
1925
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1926
    // FIXME: reject with config validator
1927
145
    if (config::file_cache_enter_disk_resource_limit_mode_percent <
1928
145
        config::file_cache_exit_disk_resource_limit_mode_percent) {
1929
1
        LOG_WARNING("config error, set to default value")
1930
1
                .tag("enter", config::file_cache_enter_disk_resource_limit_mode_percent)
1931
1
                .tag("exit", config::file_cache_exit_disk_resource_limit_mode_percent);
1932
1
        config::file_cache_enter_disk_resource_limit_mode_percent = 88;
1933
1
        config::file_cache_exit_disk_resource_limit_mode_percent = 80;
1934
1
    }
1935
145
    bool is_space_insufficient = is_insufficient(space_percentage);
1936
145
    bool is_inode_insufficient = is_insufficient(inode_percentage);
1937
145
    if (is_space_insufficient || is_inode_insufficient) {
1938
1
        _disk_resource_limit_mode = true;
1939
1
        _disk_limit_mode_metrics->set_value(1);
1940
144
    } else if (_disk_resource_limit_mode &&
1941
144
               (space_percentage < config::file_cache_exit_disk_resource_limit_mode_percent) &&
1942
144
               (inode_percentage < config::file_cache_exit_disk_resource_limit_mode_percent)) {
1943
0
        _disk_resource_limit_mode = false;
1944
0
        _disk_limit_mode_metrics->set_value(0);
1945
0
    }
1946
145
    if (previous_mode != _disk_resource_limit_mode) {
1947
        // add log for disk resource limit mode switching
1948
2
        if (_disk_resource_limit_mode) {
1949
1
            LOG(WARNING) << "Entering disk resource limit mode: file_cache=" << get_base_path()
1950
1
                         << " space_percent=" << space_percentage
1951
1
                         << " inode_percent=" << inode_percentage
1952
1
                         << " is_space_insufficient=" << is_space_insufficient
1953
1
                         << " is_inode_insufficient=" << is_inode_insufficient
1954
1
                         << " enter threshold="
1955
1
                         << config::file_cache_enter_disk_resource_limit_mode_percent;
1956
1
        } else {
1957
1
            LOG(INFO) << "Exiting disk resource limit mode: file_cache=" << get_base_path()
1958
1
                      << " space_percent=" << space_percentage
1959
1
                      << " inode_percent=" << inode_percentage << " exit threshold="
1960
1
                      << config::file_cache_exit_disk_resource_limit_mode_percent;
1961
1
        }
1962
143
    } else if (_disk_resource_limit_mode) {
1963
        // print log for disk resource limit mode running, but less frequently
1964
0
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
1965
0
                                 << " space_percent=" << space_percentage
1966
0
                                 << " inode_percent=" << inode_percentage
1967
0
                                 << " is_space_insufficient=" << is_space_insufficient
1968
0
                                 << " is_inode_insufficient=" << is_inode_insufficient
1969
0
                                 << " mode run in resource limit";
1970
0
    }
1971
145
}
1972
1973
15
void BlockFileCache::check_need_evict_cache_in_advance() {
1974
15
    if (_storage->get_type() != FileCacheStorageType::DISK) {
1975
1
        return;
1976
1
    }
1977
1978
14
    std::pair<int, int> percent;
1979
14
    int ret = disk_used_percentage(_cache_base_path, &percent);
1980
14
    if (ret != 0) {
1981
1
        LOG_ERROR("").tag("file cache path", _cache_base_path).tag("error", strerror(errno));
1982
1
        return;
1983
1
    }
1984
13
    auto [space_percentage, inode_percentage] = percent;
1985
13
    int size_percentage = static_cast<int>(_cur_cache_size * 100 / _capacity);
1986
39
    auto is_insufficient = [](const int& percentage) {
1987
39
        return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent;
1988
39
    };
1989
13
    DCHECK_GE(space_percentage, 0);
1990
13
    DCHECK_LE(space_percentage, 100);
1991
13
    DCHECK_GE(inode_percentage, 0);
1992
13
    DCHECK_LE(inode_percentage, 100);
1993
    // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1994
    // FIXME: reject with config validator
1995
13
    if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
1996
13
        config::file_cache_exit_need_evict_cache_in_advance_percent) {
1997
1
        LOG_WARNING("config error, set to default value")
1998
1
                .tag("enter", config::file_cache_enter_need_evict_cache_in_advance_percent)
1999
1
                .tag("exit", config::file_cache_exit_need_evict_cache_in_advance_percent);
2000
1
        config::file_cache_enter_need_evict_cache_in_advance_percent = 78;
2001
1
        config::file_cache_exit_need_evict_cache_in_advance_percent = 75;
2002
1
    }
2003
13
    bool previous_mode = _need_evict_cache_in_advance;
2004
13
    bool is_space_insufficient = is_insufficient(space_percentage);
2005
13
    bool is_inode_insufficient = is_insufficient(inode_percentage);
2006
13
    bool is_size_insufficient = is_insufficient(size_percentage);
2007
13
    if (is_space_insufficient || is_inode_insufficient || is_size_insufficient) {
2008
9
        _need_evict_cache_in_advance = true;
2009
9
        _need_evict_cache_in_advance_metrics->set_value(1);
2010
9
    } else if (_need_evict_cache_in_advance &&
2011
4
               (space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
2012
4
               (inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
2013
4
               (size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) {
2014
1
        _need_evict_cache_in_advance = false;
2015
1
        _need_evict_cache_in_advance_metrics->set_value(0);
2016
1
    }
2017
13
    if (previous_mode != _need_evict_cache_in_advance) {
2018
        // add log for evict cache in advance mode switching
2019
6
        if (_need_evict_cache_in_advance) {
2020
5
            LOG(WARNING) << "Entering evict cache in advance mode: "
2021
5
                         << "file_cache=" << get_base_path()
2022
5
                         << " space_percent=" << space_percentage
2023
5
                         << " inode_percent=" << inode_percentage
2024
5
                         << " size_percent=" << size_percentage
2025
5
                         << " is_space_insufficient=" << is_space_insufficient
2026
5
                         << " is_inode_insufficient=" << is_inode_insufficient
2027
5
                         << " is_size_insufficient=" << is_size_insufficient << " enter threshold="
2028
5
                         << config::file_cache_enter_need_evict_cache_in_advance_percent;
2029
5
        } else {
2030
1
            LOG(INFO) << "Exiting evict cache in advance mode: "
2031
1
                      << "file_cache=" << get_base_path() << " space_percent=" << space_percentage
2032
1
                      << " inode_percent=" << inode_percentage
2033
1
                      << " size_percent=" << size_percentage << " exit threshold="
2034
1
                      << config::file_cache_exit_need_evict_cache_in_advance_percent;
2035
1
        }
2036
7
    } else if (_need_evict_cache_in_advance) {
2037
        // print log for evict cache in advance mode running, but less frequently
2038
4
        LOG_EVERY_N(WARNING, 10) << "file_cache=" << get_base_path()
2039
1
                                 << " space_percent=" << space_percentage
2040
1
                                 << " inode_percent=" << inode_percentage
2041
1
                                 << " size_percent=" << size_percentage
2042
1
                                 << " is_space_insufficient=" << is_space_insufficient
2043
1
                                 << " is_inode_insufficient=" << is_inode_insufficient
2044
1
                                 << " is_size_insufficient=" << is_size_insufficient
2045
1
                                 << " need evict cache in advance";
2046
4
    }
2047
13
}
2048
2049
155
void BlockFileCache::run_background_monitor() {
2050
155
    Thread::set_self_name("run_background_monitor");
2051
170
    while (!_close) {
2052
170
        int64_t interval_ms = config::file_cache_background_monitor_interval_ms;
2053
170
        TEST_SYNC_POINT_CALLBACK("BlockFileCache::set_sleep_time", &interval_ms);
2054
170
        check_disk_resource_limit();
2055
170
        if (config::enable_evict_file_cache_in_advance) {
2056
7
            check_need_evict_cache_in_advance();
2057
163
        } else {
2058
163
            _need_evict_cache_in_advance = false;
2059
163
            _need_evict_cache_in_advance_metrics->set_value(0);
2060
163
        }
2061
2062
170
        {
2063
170
            std::unique_lock close_lock(_close_mtx);
2064
170
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2065
170
            if (_close) {
2066
155
                break;
2067
155
            }
2068
170
        }
2069
        // report
2070
15
        {
2071
15
            SCOPED_CACHE_LOCK(_mutex, this);
2072
15
            _cur_cache_size_metrics->set_value(_cur_cache_size);
2073
15
            _cur_ttl_cache_size_metrics->set_value(_cur_cache_size -
2074
15
                                                   _index_queue.get_capacity(cache_lock) -
2075
15
                                                   _normal_queue.get_capacity(cache_lock) -
2076
15
                                                   _disposable_queue.get_capacity(cache_lock));
2077
15
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(
2078
15
                    _ttl_queue.get_capacity(cache_lock));
2079
15
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(
2080
15
                    _ttl_queue.get_elements_num(cache_lock));
2081
15
            _cur_normal_queue_cache_size_metrics->set_value(_normal_queue.get_capacity(cache_lock));
2082
15
            _cur_normal_queue_element_count_metrics->set_value(
2083
15
                    _normal_queue.get_elements_num(cache_lock));
2084
15
            _cur_index_queue_cache_size_metrics->set_value(_index_queue.get_capacity(cache_lock));
2085
15
            _cur_index_queue_element_count_metrics->set_value(
2086
15
                    _index_queue.get_elements_num(cache_lock));
2087
15
            _cur_disposable_queue_cache_size_metrics->set_value(
2088
15
                    _disposable_queue.get_capacity(cache_lock));
2089
15
            _cur_disposable_queue_element_count_metrics->set_value(
2090
15
                    _disposable_queue.get_elements_num(cache_lock));
2091
2092
            // Update meta store write queue size if storage is FSFileCacheStorage
2093
15
            if (_storage->get_type() == FileCacheStorageType::DISK) {
2094
13
                auto* fs_storage = dynamic_cast<FSFileCacheStorage*>(_storage.get());
2095
13
                if (fs_storage != nullptr) {
2096
13
                    auto* meta_store = fs_storage->get_meta_store();
2097
13
                    if (meta_store != nullptr) {
2098
13
                        _meta_store_write_queue_size_metrics->set_value(
2099
13
                                meta_store->get_write_queue_size());
2100
13
                    }
2101
13
                }
2102
13
            }
2103
2104
15
            if (_num_read_blocks->get_value() > 0) {
2105
12
                _hit_ratio->set_value((double)_num_hit_blocks->get_value() /
2106
12
                                      (double)_num_read_blocks->get_value());
2107
12
            }
2108
15
            if (_num_read_blocks_5m && _num_read_blocks_5m->get_value() > 0) {
2109
0
                _hit_ratio_5m->set_value((double)_num_hit_blocks_5m->get_value() /
2110
0
                                         (double)_num_read_blocks_5m->get_value());
2111
0
            }
2112
15
            if (_num_read_blocks_1h && _num_read_blocks_1h->get_value() > 0) {
2113
0
                _hit_ratio_1h->set_value((double)_num_hit_blocks_1h->get_value() /
2114
0
                                         (double)_num_read_blocks_1h->get_value());
2115
0
            }
2116
2117
15
            if (_no_warmup_num_read_blocks->get_value() > 0) {
2118
12
                _no_warmup_hit_ratio->set_value((double)_no_warmup_num_hit_blocks->get_value() /
2119
12
                                                (double)_no_warmup_num_read_blocks->get_value());
2120
12
            }
2121
15
            if (_no_warmup_num_read_blocks_5m && _no_warmup_num_read_blocks_5m->get_value() > 0) {
2122
0
                _no_warmup_hit_ratio_5m->set_value(
2123
0
                        (double)_no_warmup_num_hit_blocks_5m->get_value() /
2124
0
                        (double)_no_warmup_num_read_blocks_5m->get_value());
2125
0
            }
2126
15
            if (_no_warmup_num_read_blocks_1h && _no_warmup_num_read_blocks_1h->get_value() > 0) {
2127
0
                _no_warmup_hit_ratio_1h->set_value(
2128
0
                        (double)_no_warmup_num_hit_blocks_1h->get_value() /
2129
0
                        (double)_no_warmup_num_read_blocks_1h->get_value());
2130
0
            }
2131
15
        }
2132
15
    }
2133
155
}
2134
2135
155
void BlockFileCache::run_background_gc() {
2136
155
    Thread::set_self_name("run_background_gc");
2137
155
    FileCacheKey key;
2138
155
    size_t batch_count = 0;
2139
399
    while (!_close) {
2140
399
        int64_t interval_ms = config::file_cache_background_gc_interval_ms;
2141
399
        size_t batch_limit = config::file_cache_remove_block_qps_limit * interval_ms / 1000;
2142
399
        {
2143
399
            std::unique_lock close_lock(_close_mtx);
2144
399
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2145
399
            if (_close) {
2146
155
                break;
2147
155
            }
2148
399
        }
2149
2150
249
        while (batch_count < batch_limit && _recycle_keys.try_dequeue(key)) {
2151
5
            int64_t duration_ns = 0;
2152
5
            Status st;
2153
5
            {
2154
5
                SCOPED_RAW_TIMER(&duration_ns);
2155
5
                st = _storage->remove(key);
2156
5
            }
2157
5
            *_storage_async_remove_latency_us << (duration_ns / 1000);
2158
2159
5
            if (!st.ok()) {
2160
0
                LOG_WARNING("").error(st);
2161
0
            }
2162
5
            batch_count++;
2163
5
        }
2164
244
        *_recycle_keys_length_recorder << _recycle_keys.size_approx();
2165
244
        batch_count = 0;
2166
244
    }
2167
155
}
2168
2169
155
void BlockFileCache::run_background_evict_in_advance() {
2170
155
    Thread::set_self_name("run_background_evict_in_advance");
2171
155
    LOG(INFO) << "Starting background evict in advance thread";
2172
155
    int64_t batch = 0;
2173
277
    while (!_close) {
2174
277
        {
2175
277
            std::unique_lock close_lock(_close_mtx);
2176
277
            _close_cv.wait_for(
2177
277
                    close_lock,
2178
277
                    std::chrono::milliseconds(config::file_cache_evict_in_advance_interval_ms));
2179
277
            if (_close) {
2180
155
                LOG(INFO) << "Background evict in advance thread exiting due to cache closing";
2181
155
                break;
2182
155
            }
2183
277
        }
2184
122
        batch = config::file_cache_evict_in_advance_batch_bytes;
2185
2186
        // Skip if eviction not needed or too many pending recycles
2187
122
        if (!_need_evict_cache_in_advance ||
2188
122
            _recycle_keys.size_approx() >=
2189
121
                    config::file_cache_evict_in_advance_recycle_keys_num_threshold) {
2190
121
            continue;
2191
121
        }
2192
2193
1
        int64_t duration_ns = 0;
2194
1
        {
2195
1
            SCOPED_CACHE_LOCK(_mutex, this);
2196
1
            SCOPED_RAW_TIMER(&duration_ns);
2197
1
            try_evict_in_advance(batch, cache_lock);
2198
1
        }
2199
1
        *_evict_in_advance_latency_us << (duration_ns / 1000);
2200
1
    }
2201
155
}
2202
2203
155
void BlockFileCache::run_background_block_lru_update() {
2204
155
    Thread::set_self_name("run_background_block_lru_update");
2205
155
    std::vector<FileBlockSPtr> batch;
2206
199
    while (!_close) {
2207
199
        int64_t interval_ms = config::file_cache_background_block_lru_update_interval_ms;
2208
199
        size_t batch_limit =
2209
199
                config::file_cache_background_block_lru_update_qps_limit * interval_ms / 1000;
2210
199
        {
2211
199
            std::unique_lock close_lock(_close_mtx);
2212
199
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2213
199
            if (_close) {
2214
155
                break;
2215
155
            }
2216
199
        }
2217
2218
44
        batch.clear();
2219
44
        batch.reserve(batch_limit);
2220
44
        size_t drained = _need_update_lru_blocks.drain(batch_limit, &batch);
2221
44
        if (drained == 0) {
2222
39
            *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2223
39
            continue;
2224
39
        }
2225
5
        *_need_update_lru_blocks_consume_metrics << drained;
2226
2227
5
        int64_t duration_ns = 0;
2228
5
        {
2229
5
            SCOPED_CACHE_LOCK(_mutex, this);
2230
5
            SCOPED_RAW_TIMER(&duration_ns);
2231
5
            for (auto& block : batch) {
2232
5
                update_block_lru(block, cache_lock);
2233
5
            }
2234
5
        }
2235
5
        *_update_lru_blocks_latency_us << (duration_ns / 1000);
2236
5
        *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2237
5
    }
2238
155
}
2239
2240
std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>>
2241
2
BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const {
2242
2
    int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
2243
2
                               std::chrono::steady_clock::now().time_since_epoch())
2244
2
                               .count();
2245
2
    SCOPED_CACHE_LOCK(_mutex, this);
2246
2
    std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>> blocks_meta;
2247
2
    if (auto iter = _files.find(hash); iter != _files.end()) {
2248
5
        for (auto& pair : _files.find(hash)->second) {
2249
5
            const FileBlockCell* cell = &pair.second;
2250
5
            if (cell->file_block->cache_type() != FileCacheType::DISPOSABLE) {
2251
4
                if (cell->file_block->cache_type() == FileCacheType::TTL ||
2252
4
                    (cell->atime != 0 &&
2253
3
                     cur_time - cell->atime <
2254
3
                             get_queue(cell->file_block->cache_type()).get_hot_data_interval())) {
2255
3
                    blocks_meta.emplace_back(pair.first, cell->size(),
2256
3
                                             cell->file_block->cache_type(),
2257
3
                                             cell->file_block->expiration_time());
2258
3
                }
2259
4
            }
2260
5
        }
2261
2
    }
2262
2
    return blocks_meta;
2263
2
}
2264
2265
bool BlockFileCache::try_reserve_during_async_load(size_t size,
2266
4
                                                   std::lock_guard<std::mutex>& cache_lock) {
2267
4
    size_t removed_size = 0;
2268
4
    size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
2269
4
    size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
2270
4
    size_t index_queue_size = _index_queue.get_capacity(cache_lock);
2271
2272
4
    std::vector<FileBlockCell*> to_evict;
2273
4
    auto collect_eliminate_fragments = [&](LRUQueue& queue) {
2274
4
        for (const auto& [entry_key, entry_offset, entry_size] : queue) {
2275
4
            if (!_disk_resource_limit_mode || removed_size >= size) {
2276
3
                break;
2277
3
            }
2278
1
            auto* cell = get_cell(entry_key, entry_offset, cache_lock);
2279
2280
1
            DCHECK(cell) << "Cache became inconsistent. UInt128Wrapper: " << entry_key.to_string()
2281
0
                         << ", offset: " << entry_offset;
2282
2283
1
            size_t cell_size = cell->size();
2284
1
            DCHECK(entry_size == cell_size);
2285
2286
1
            if (cell->releasable()) {
2287
1
                auto& file_block = cell->file_block;
2288
2289
1
                std::lock_guard block_lock(file_block->_mutex);
2290
1
                DCHECK(file_block->_download_state == FileBlock::State::DOWNLOADED);
2291
1
                to_evict.push_back(cell);
2292
1
                removed_size += cell_size;
2293
1
            }
2294
1
        }
2295
3
    };
2296
4
    if (disposable_queue_size != 0) {
2297
0
        collect_eliminate_fragments(get_queue(FileCacheType::DISPOSABLE));
2298
0
    }
2299
4
    if (normal_queue_size != 0) {
2300
3
        collect_eliminate_fragments(get_queue(FileCacheType::NORMAL));
2301
3
    }
2302
4
    if (index_queue_size != 0) {
2303
0
        collect_eliminate_fragments(get_queue(FileCacheType::INDEX));
2304
0
    }
2305
4
    std::string reason = "async load";
2306
4
    remove_file_blocks(to_evict, cache_lock, true, reason);
2307
2308
4
    return !_disk_resource_limit_mode || removed_size >= size;
2309
4
}
2310
2311
21
void BlockFileCache::clear_need_update_lru_blocks() {
2312
21
    _need_update_lru_blocks.clear();
2313
21
    *_need_update_lru_blocks_length_recorder << _need_update_lru_blocks.size();
2314
21
}
2315
2316
17
void BlockFileCache::pause_ttl_manager() {
2317
17
    if (_ttl_mgr) {
2318
4
        _ttl_mgr->stop();
2319
4
    }
2320
17
}
2321
2322
17
void BlockFileCache::resume_ttl_manager() {
2323
17
    if (_ttl_mgr) {
2324
4
        _ttl_mgr->resume();
2325
4
    }
2326
17
}
2327
2328
17
std::string BlockFileCache::clear_file_cache_directly() {
2329
17
    pause_ttl_manager();
2330
17
    _lru_dumper->remove_lru_dump_files();
2331
17
    using namespace std::chrono;
2332
17
    std::stringstream ss;
2333
17
    auto start = steady_clock::now();
2334
17
    std::string result;
2335
17
    {
2336
17
        SCOPED_CACHE_LOCK(_mutex, this);
2337
17
        LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path);
2338
2339
17
        std::string clear_msg;
2340
17
        auto s = _storage->clear(clear_msg);
2341
17
        if (!s.ok()) {
2342
1
            result = clear_msg;
2343
16
        } else {
2344
16
            int64_t num_files = _files.size();
2345
16
            int64_t cache_size = _cur_cache_size;
2346
16
            int64_t index_queue_size = _index_queue.get_elements_num(cache_lock);
2347
16
            int64_t normal_queue_size = _normal_queue.get_elements_num(cache_lock);
2348
16
            int64_t disposible_queue_size = _disposable_queue.get_elements_num(cache_lock);
2349
16
            int64_t ttl_queue_size = _ttl_queue.get_elements_num(cache_lock);
2350
2351
16
            int64_t clear_fd_duration = 0;
2352
16
            {
2353
                // clear FDCache to release fd
2354
16
                SCOPED_RAW_TIMER(&clear_fd_duration);
2355
5.12k
                for (const auto& [file_key, file_blocks] : _files) {
2356
5.12k
                    for (const auto& [offset, file_block_cell] : file_blocks) {
2357
5.12k
                        AccessKeyAndOffset access_key_and_offset(file_key, offset);
2358
5.12k
                        FDCache::instance()->remove_file_reader(access_key_and_offset);
2359
5.12k
                    }
2360
5.12k
                }
2361
16
            }
2362
2363
16
            _files.clear();
2364
16
            _cur_cache_size = 0;
2365
16
            _cur_ttl_size = 0;
2366
16
            _time_to_key.clear();
2367
16
            _key_to_time.clear();
2368
16
            _index_queue.clear(cache_lock);
2369
16
            _normal_queue.clear(cache_lock);
2370
16
            _disposable_queue.clear(cache_lock);
2371
16
            _ttl_queue.clear(cache_lock);
2372
2373
            // Update cache metrics immediately so consumers observe the cleared state
2374
            // without waiting for the next background monitor round.
2375
16
            _cur_cache_size_metrics->set_value(0);
2376
16
            _cur_ttl_cache_size_metrics->set_value(0);
2377
16
            _cur_ttl_cache_lru_queue_cache_size_metrics->set_value(0);
2378
16
            _cur_ttl_cache_lru_queue_element_count_metrics->set_value(0);
2379
16
            _cur_normal_queue_cache_size_metrics->set_value(0);
2380
16
            _cur_normal_queue_element_count_metrics->set_value(0);
2381
16
            _cur_index_queue_cache_size_metrics->set_value(0);
2382
16
            _cur_index_queue_element_count_metrics->set_value(0);
2383
16
            _cur_disposable_queue_cache_size_metrics->set_value(0);
2384
16
            _cur_disposable_queue_element_count_metrics->set_value(0);
2385
2386
16
            clear_need_update_lru_blocks();
2387
2388
16
            ss << "finish clear_file_cache_directly"
2389
16
               << " path=" << _cache_base_path << " time_elapsed_ms="
2390
16
               << duration_cast<milliseconds>(steady_clock::now() - start).count()
2391
16
               << " fd_clear_time_ms=" << (clear_fd_duration / 1000000)
2392
16
               << " num_files=" << num_files << " cache_size=" << cache_size
2393
16
               << " index_queue_size=" << index_queue_size
2394
16
               << " normal_queue_size=" << normal_queue_size
2395
16
               << " disposible_queue_size=" << disposible_queue_size
2396
16
               << "ttl_queue_size=" << ttl_queue_size;
2397
16
            result = ss.str();
2398
16
            LOG(INFO) << result;
2399
16
        }
2400
17
    }
2401
17
    _lru_dumper->remove_lru_dump_files();
2402
17
    resume_ttl_manager();
2403
17
    return result;
2404
17
}
2405
2406
5.14k
std::map<size_t, FileBlockSPtr> BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) {
2407
5.14k
    std::map<size_t, FileBlockSPtr> offset_to_block;
2408
5.14k
    SCOPED_CACHE_LOCK(_mutex, this);
2409
5.14k
    if (_files.contains(hash)) {
2410
5.14k
        for (auto& [offset, cell] : _files[hash]) {
2411
5.14k
            if (cell.file_block->state() == FileBlock::State::DOWNLOADED) {
2412
5.14k
                cell.file_block->_owned_by_cached_reader = true;
2413
5.14k
                offset_to_block.emplace(offset, cell.file_block);
2414
5.14k
            }
2415
5.14k
        }
2416
5.13k
    }
2417
5.14k
    return offset_to_block;
2418
5.14k
}
2419
2420
0
void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) {
2421
0
    SCOPED_CACHE_LOCK(_mutex, this);
2422
0
    if (auto iter = _files.find(hash); iter != _files.end()) {
2423
0
        for (auto& [_, cell] : iter->second) {
2424
0
            cell.update_atime();
2425
0
        }
2426
0
    };
2427
0
}
2428
2429
155
void BlockFileCache::run_background_lru_log_replay() {
2430
155
    Thread::set_self_name("run_background_lru_log_replay");
2431
107k
    while (!_close) {
2432
107k
        int64_t interval_ms = config::file_cache_background_lru_log_replay_interval_ms;
2433
107k
        {
2434
107k
            std::unique_lock close_lock(_close_mtx);
2435
107k
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2436
107k
            if (_close) {
2437
136
                break;
2438
136
            }
2439
107k
        }
2440
2441
106k
        replay_lru_logs_once();
2442
106k
    }
2443
155
}
2444
2445
106k
size_t BlockFileCache::replay_lru_logs_once() {
2446
106k
    size_t replayed = 0;
2447
428k
    for (FileCacheType type : LRU_LOG_REPLAY_TYPES) {
2448
428k
        replayed += _lru_recorder->replay_queue_event(type);
2449
428k
    }
2450
2451
106k
    if (replayed == 0) {
2452
91.1k
        *_lru_recorder_log_replay_idle_metrics << 1;
2453
91.1k
    }
2454
2455
106k
    if (config::enable_evaluate_shadow_queue_diff) {
2456
0
        SCOPED_CACHE_LOCK(_mutex, this);
2457
0
        _lru_recorder->evaluate_queue_diff(_ttl_queue, "ttl", cache_lock);
2458
0
        _lru_recorder->evaluate_queue_diff(_index_queue, "index", cache_lock);
2459
0
        _lru_recorder->evaluate_queue_diff(_normal_queue, "normal", cache_lock);
2460
0
        _lru_recorder->evaluate_queue_diff(_disposable_queue, "disposable", cache_lock);
2461
0
    }
2462
106k
    return replayed;
2463
106k
}
2464
2465
14
void BlockFileCache::dump_lru_queues(bool force) {
2466
14
    std::unique_lock dump_lock(_dump_lru_queues_mtx);
2467
14
    if (config::file_cache_background_lru_dump_tail_record_num > 0 &&
2468
14
        !ExecEnv::GetInstance()->get_is_upgrading()) {
2469
14
        _lru_dumper->dump_queue("disposable", force);
2470
14
        _lru_dumper->dump_queue("normal", force);
2471
14
        _lru_dumper->dump_queue("index", force);
2472
14
        _lru_dumper->dump_queue("ttl", force);
2473
14
        _lru_dumper->set_first_dump_done();
2474
14
    }
2475
14
}
2476
2477
155
void BlockFileCache::run_background_lru_dump() {
2478
155
    Thread::set_self_name("run_background_lru_dump");
2479
169
    while (!_close) {
2480
169
        int64_t interval_ms = config::file_cache_background_lru_dump_interval_ms;
2481
169
        {
2482
169
            std::unique_lock close_lock(_close_mtx);
2483
169
            _close_cv.wait_for(close_lock, std::chrono::milliseconds(interval_ms));
2484
169
            if (_close) {
2485
155
                break;
2486
155
            }
2487
169
        }
2488
14
        dump_lru_queues(false);
2489
14
    }
2490
155
}
2491
2492
155
void BlockFileCache::restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock) {
2493
    // keep this order coz may be duplicated in different queue, we use the first appearence
2494
155
    _lru_dumper->restore_queue(_ttl_queue, "ttl", cache_lock);
2495
155
    _lru_dumper->restore_queue(_index_queue, "index", cache_lock);
2496
155
    _lru_dumper->restore_queue(_normal_queue, "normal", cache_lock);
2497
155
    _lru_dumper->restore_queue(_disposable_queue, "disposable", cache_lock);
2498
155
}
2499
2500
0
std::map<std::string, double> BlockFileCache::get_stats() {
2501
0
    std::map<std::string, double> stats;
2502
0
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2503
0
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2504
0
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2505
2506
0
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2507
0
    stats["index_queue_curr_size"] = (double)_cur_index_queue_cache_size_metrics->get_value();
2508
0
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2509
0
    stats["index_queue_curr_elements"] =
2510
0
            (double)_cur_index_queue_element_count_metrics->get_value();
2511
2512
0
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2513
0
    stats["ttl_queue_curr_size"] = (double)_cur_ttl_cache_lru_queue_cache_size_metrics->get_value();
2514
0
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2515
0
    stats["ttl_queue_curr_elements"] =
2516
0
            (double)_cur_ttl_cache_lru_queue_element_count_metrics->get_value();
2517
2518
0
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2519
0
    stats["normal_queue_curr_size"] = (double)_cur_normal_queue_cache_size_metrics->get_value();
2520
0
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2521
0
    stats["normal_queue_curr_elements"] =
2522
0
            (double)_cur_normal_queue_element_count_metrics->get_value();
2523
2524
0
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2525
0
    stats["disposable_queue_curr_size"] =
2526
0
            (double)_cur_disposable_queue_cache_size_metrics->get_value();
2527
0
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2528
0
    stats["disposable_queue_curr_elements"] =
2529
0
            (double)_cur_disposable_queue_element_count_metrics->get_value();
2530
2531
0
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2532
0
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2533
2534
0
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2535
0
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2536
0
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2537
2538
0
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2539
0
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2540
0
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2541
2542
0
    return stats;
2543
0
}
2544
2545
// for be UTs
2546
172
std::map<std::string, double> BlockFileCache::get_stats_unsafe() {
2547
172
    std::map<std::string, double> stats;
2548
172
    stats["hits_ratio"] = (double)_hit_ratio->get_value();
2549
172
    stats["hits_ratio_5m"] = (double)_hit_ratio_5m->get_value();
2550
172
    stats["hits_ratio_1h"] = (double)_hit_ratio_1h->get_value();
2551
2552
172
    stats["index_queue_max_size"] = (double)_index_queue.get_max_size();
2553
172
    stats["index_queue_curr_size"] = (double)_index_queue.get_capacity_unsafe();
2554
172
    stats["index_queue_max_elements"] = (double)_index_queue.get_max_element_size();
2555
172
    stats["index_queue_curr_elements"] = (double)_index_queue.get_elements_num_unsafe();
2556
2557
172
    stats["ttl_queue_max_size"] = (double)_ttl_queue.get_max_size();
2558
172
    stats["ttl_queue_curr_size"] = (double)_ttl_queue.get_capacity_unsafe();
2559
172
    stats["ttl_queue_max_elements"] = (double)_ttl_queue.get_max_element_size();
2560
172
    stats["ttl_queue_curr_elements"] = (double)_ttl_queue.get_elements_num_unsafe();
2561
2562
172
    stats["normal_queue_max_size"] = (double)_normal_queue.get_max_size();
2563
172
    stats["normal_queue_curr_size"] = (double)_normal_queue.get_capacity_unsafe();
2564
172
    stats["normal_queue_max_elements"] = (double)_normal_queue.get_max_element_size();
2565
172
    stats["normal_queue_curr_elements"] = (double)_normal_queue.get_elements_num_unsafe();
2566
2567
172
    stats["disposable_queue_max_size"] = (double)_disposable_queue.get_max_size();
2568
172
    stats["disposable_queue_curr_size"] = (double)_disposable_queue.get_capacity_unsafe();
2569
172
    stats["disposable_queue_max_elements"] = (double)_disposable_queue.get_max_element_size();
2570
172
    stats["disposable_queue_curr_elements"] = (double)_disposable_queue.get_elements_num_unsafe();
2571
2572
172
    stats["need_evict_cache_in_advance"] = (double)_need_evict_cache_in_advance;
2573
172
    stats["disk_resource_limit_mode"] = (double)_disk_resource_limit_mode;
2574
2575
172
    stats["total_removed_counts"] = (double)_num_removed_blocks->get_value();
2576
172
    stats["total_hit_counts"] = (double)_num_hit_blocks->get_value();
2577
172
    stats["total_read_counts"] = (double)_num_read_blocks->get_value();
2578
2579
172
    stats["total_read_size"] = (double)_total_read_size_metrics->get_value();
2580
172
    stats["total_hit_size"] = (double)_total_hit_size_metrics->get_value();
2581
172
    stats["total_removed_size"] = (double)_total_evict_size_metrics->get_value();
2582
2583
172
    return stats;
2584
172
}
2585
2586
template void BlockFileCache::remove(FileBlockSPtr file_block,
2587
                                     std::lock_guard<std::mutex>& cache_lock,
2588
                                     std::lock_guard<std::mutex>& block_lock, bool sync);
2589
2590
1
Status BlockFileCache::report_file_cache_inconsistency(std::vector<std::string>& results) {
2591
1
    InconsistencyContext inconsistency_context;
2592
1
    RETURN_IF_ERROR(check_file_cache_consistency(inconsistency_context));
2593
1
    auto n = inconsistency_context.types.size();
2594
1
    results.reserve(n);
2595
1
    for (size_t i = 0; i < n; i++) {
2596
0
        std::string result;
2597
0
        result += "File cache info in manager:\n";
2598
0
        result += inconsistency_context.infos_in_manager[i].to_string();
2599
0
        result += "File cache info in storage:\n";
2600
0
        result += inconsistency_context.infos_in_storage[i].to_string();
2601
0
        result += inconsistency_context.types[i].to_string();
2602
0
        result += "\n";
2603
0
        results.push_back(std::move(result));
2604
0
    }
2605
1
    return Status::OK();
2606
1
}
2607
2608
1
Status BlockFileCache::check_file_cache_consistency(InconsistencyContext& inconsistency_context) {
2609
1
    std::lock_guard<std::mutex> cache_lock(_mutex);
2610
1
    std::vector<FileCacheInfo> infos_in_storage;
2611
1
    RETURN_IF_ERROR(_storage->get_file_cache_infos(infos_in_storage, cache_lock));
2612
1
    std::unordered_set<AccessKeyAndOffset, KeyAndOffsetHash> confirmed_blocks;
2613
1
    for (const auto& info_in_storage : infos_in_storage) {
2614
0
        confirmed_blocks.insert({info_in_storage.hash, info_in_storage.offset});
2615
0
        auto* cell = get_cell(info_in_storage.hash, info_in_storage.offset, cache_lock);
2616
0
        if (cell == nullptr || cell->file_block == nullptr) {
2617
0
            inconsistency_context.infos_in_manager.emplace_back();
2618
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2619
0
            inconsistency_context.types.emplace_back(InconsistencyType::NOT_LOADED);
2620
0
            continue;
2621
0
        }
2622
0
        FileCacheInfo info_in_manager {
2623
0
                .hash = info_in_storage.hash,
2624
0
                .expiration_time = cell->file_block->expiration_time(),
2625
0
                .size = cell->size(),
2626
0
                .offset = info_in_storage.offset,
2627
0
                .is_tmp = cell->file_block->state() == FileBlock::State::DOWNLOADING,
2628
0
                .cache_type = cell->file_block->cache_type()};
2629
0
        InconsistencyType inconsistent_type;
2630
0
        if (info_in_storage.is_tmp != info_in_manager.is_tmp) {
2631
0
            inconsistent_type |= InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE;
2632
0
        }
2633
0
        size_t expected_size =
2634
0
                info_in_manager.is_tmp ? cell->dowloading_size() : info_in_manager.size;
2635
0
        if (info_in_storage.size != expected_size) {
2636
0
            inconsistent_type |= InconsistencyType::SIZE_INCONSISTENT;
2637
0
        }
2638
        // Only if it is not a tmp file need we check the cache type.
2639
0
        if ((inconsistent_type & InconsistencyType::TMP_FILE_EXPECT_DOWNLOADING_STATE) == 0 &&
2640
0
            info_in_storage.cache_type != info_in_manager.cache_type) {
2641
0
            inconsistent_type |= InconsistencyType::CACHE_TYPE_INCONSISTENT;
2642
0
        }
2643
0
        if (info_in_storage.expiration_time != info_in_manager.expiration_time) {
2644
0
            inconsistent_type |= InconsistencyType::EXPIRATION_TIME_INCONSISTENT;
2645
0
        }
2646
0
        if (inconsistent_type != InconsistencyType::NONE) {
2647
0
            inconsistency_context.infos_in_manager.push_back(info_in_manager);
2648
0
            inconsistency_context.infos_in_storage.push_back(info_in_storage);
2649
0
            inconsistency_context.types.push_back(inconsistent_type);
2650
0
        }
2651
0
    }
2652
2653
1
    for (const auto& [hash, offset_to_cell] : _files) {
2654
0
        for (const auto& [offset, cell] : offset_to_cell) {
2655
0
            if (confirmed_blocks.contains({hash, offset})) {
2656
0
                continue;
2657
0
            }
2658
0
            const auto& block = cell.file_block;
2659
0
            inconsistency_context.infos_in_manager.emplace_back(
2660
0
                    hash, block->expiration_time(), cell.size(), offset,
2661
0
                    cell.file_block->state() == FileBlock::State::DOWNLOADING, block->cache_type());
2662
0
            inconsistency_context.infos_in_storage.emplace_back();
2663
0
            inconsistency_context.types.emplace_back(InconsistencyType::MISSING_IN_STORAGE);
2664
0
        }
2665
0
    }
2666
1
    return Status::OK();
2667
1
}
2668
2669
} // namespace doris::io