Coverage Report

Created: 2026-06-03 02:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/block_file_cache.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bvar/bvar.h>
21
#include <concurrentqueue.h>
22
23
#include <algorithm>
24
#include <array>
25
#include <atomic>
26
#include <boost/lockfree/spsc_queue.hpp>
27
#include <functional>
28
#include <memory>
29
#include <mutex>
30
#include <optional>
31
#include <string_view>
32
#include <thread>
33
#include <unordered_map>
34
#include <vector>
35
36
#include "io/cache/block_file_cache_ttl_mgr.h"
37
#include "io/cache/cache_lru_dumper.h"
38
#include "io/cache/file_block.h"
39
#include "io/cache/file_cache_common.h"
40
#include "io/cache/file_cache_storage.h"
41
#include "io/cache/lru_queue_recorder.h"
42
#include "runtime/runtime_profile.h"
43
#include "util/threadpool.h"
44
45
namespace doris::io {
46
using RecycleFileCacheKeys = moodycamel::ConcurrentQueue<FileCacheKey>;
47
48
class LockScopedTimer {
49
public:
50
8.49k
    LockScopedTimer() : start_(std::chrono::steady_clock::now()) {}
51
8.49k
    ~LockScopedTimer() {
52
8.49k
        auto end = std::chrono::steady_clock::now();
53
8.49k
        auto duration_us =
54
8.49k
                std::chrono::duration_cast<std::chrono::microseconds>(end - start_).count();
55
8.49k
        if (duration_us > config::cache_lock_held_long_tail_threshold_us) {
56
0
            LOG(WARNING) << "Lock held time " << std::to_string(duration_us) << "us. "
57
0
                         << get_stack_trace();
58
0
        }
59
8.49k
    }
60
61
private:
62
    std::chrono::time_point<std::chrono::steady_clock> start_;
63
};
64
65
// Note: the cache_lock is scoped, so do not add do...while(0) here.
66
#define SCOPED_CACHE_LOCK(MUTEX, cache)                                                           \
67
8.49k
    std::chrono::time_point<std::chrono::steady_clock> start_time =                               \
68
8.49k
            std::chrono::steady_clock::now();                                                     \
69
8.49k
    std::lock_guard cache_lock(MUTEX);                                                            \
70
8.49k
    std::chrono::time_point<std::chrono::steady_clock> acq_time =                                 \
71
8.49k
            std::chrono::steady_clock::now();                                                     \
72
8.49k
    auto duration_us =                                                                            \
73
8.49k
            std::chrono::duration_cast<std::chrono::microseconds>(acq_time - start_time).count(); \
74
8.49k
    *(cache->_cache_lock_wait_time_us) << duration_us;                                            \
75
8.49k
    if (duration_us > config::cache_lock_wait_long_tail_threshold_us) {                           \
76
0
        LOG(WARNING) << "Lock wait time " << std::to_string(duration_us) << "us. "                \
77
0
                     << get_stack_trace() << std::endl;                                           \
78
0
    }                                                                                             \
79
8.49k
    LockScopedTimer cache_lock_timer;
80
81
class FSFileCacheStorage;
82
83
// NeedUpdateLRUBlocks keeps FileBlockSPtr entries that require LRU updates in a
84
// deduplicated, sharded container. Entries are keyed by the raw FileBlock
85
// pointer so that multiple shared_ptr copies of the same block are treated as a
86
// single pending update. The structure is thread-safe and optimized for high
87
// contention insert/drain workloads in the background update thread.
88
// Note that Blocks are updated in batch, internal order is not important.
89
class NeedUpdateLRUBlocks {
90
public:
91
184
    NeedUpdateLRUBlocks() = default;
92
93
    // Insert a block into the pending set. Returns true only when the block
94
    // was not already queued. Null inputs are ignored.
95
    bool insert(FileBlockSPtr block);
96
97
    // Drain up to `limit` unique blocks into `output`. The method returns how
98
    // many blocks were actually drained and shrinks the internal size
99
    // accordingly.
100
    size_t drain(size_t limit, std::vector<FileBlockSPtr>* output);
101
102
    // Remove every pending block from the structure and reset the size.
103
    void clear();
104
105
    // Thread-safe approximate size of queued unique blocks.
106
9.61k
    size_t size() const { return _size.load(std::memory_order_relaxed); }
107
108
private:
109
    static constexpr size_t kShardCount = 64;
110
    static constexpr size_t kShardMask = kShardCount - 1;
111
112
    struct Shard {
113
        std::mutex mutex;
114
        std::unordered_map<FileBlock*, FileBlockSPtr> entries;
115
    };
116
117
    size_t shard_index(FileBlock* ptr) const;
118
119
    std::array<Shard, kShardCount> _shards;
120
    std::atomic<size_t> _size {0};
121
};
122
123
// The BlockFileCache is responsible for the management of the blocks
124
// The current strategies are lru and ttl.
125
126
struct FileBlockCell {
127
    friend class FileBlock;
128
129
    FileBlockSPtr file_block;
130
    /// Iterator is put here on first reservation attempt, if successful.
131
    std::optional<LRUQueue::Iterator> queue_iterator;
132
133
    mutable int64_t atime {0};
134
730k
    void update_atime() const {
135
730k
        atime = std::chrono::duration_cast<std::chrono::seconds>(
136
730k
                        std::chrono::steady_clock::now().time_since_epoch())
137
730k
                        .count();
138
730k
    }
139
140
    /// Pointer to file block is always hold by the cache itself.
141
    /// Apart from pointer in cache, it can be hold by cache users, when they call
142
    /// getorSet(), but cache users always hold it via FileBlocksHolder.
143
7.02k
    bool releasable() const {
144
7.02k
        return (file_block.use_count() == 1 ||
145
7.02k
                (file_block.use_count() == 2 && file_block->_owned_by_cached_reader));
146
7.02k
    }
147
148
549k
    size_t size() const { return file_block->_block_range.size(); }
149
150
70
    FileBlockCell() = default;
151
    FileBlockCell(FileBlockSPtr file_block, std::lock_guard<std::mutex>& cache_lock);
152
    FileBlockCell(FileBlockCell&& other) noexcept
153
27.2k
            : file_block(std::move(other.file_block)),
154
27.2k
              queue_iterator(other.queue_iterator),
155
27.2k
              atime(other.atime) {
156
27.2k
        file_block->cell = this;
157
27.2k
    }
158
159
    FileBlockCell& operator=(const FileBlockCell&) = delete;
160
    FileBlockCell(const FileBlockCell&) = delete;
161
162
0
    size_t dowloading_size() const { return file_block->_downloaded_size; }
163
};
164
165
class BlockFileCache {
166
    friend class FSFileCacheStorage;
167
    friend class MemFileCacheStorage;
168
    friend class FileBlock;
169
    friend struct FileBlocksHolder;
170
    friend class CacheLRUDumper;
171
    friend class LRUQueueRecorder;
172
    friend struct FileBlockCell;
173
    friend class BlockFileCacheTest;
174
175
public:
176
    // hash the file_name to uint128
177
    static UInt128Wrapper hash(const std::string& path);
178
179
    BlockFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings);
180
181
179
    virtual ~BlockFileCache() {
182
179
        {
183
179
            std::lock_guard lock(_close_mtx);
184
179
            _close = true;
185
179
        }
186
179
        _close_cv.notify_all();
187
179
        if (_cache_background_monitor_thread.joinable()) {
188
156
            _cache_background_monitor_thread.join();
189
156
        }
190
179
        if (_cache_background_gc_thread.joinable()) {
191
156
            _cache_background_gc_thread.join();
192
156
        }
193
179
        if (_cache_background_evict_in_advance_thread.joinable()) {
194
156
            _cache_background_evict_in_advance_thread.join();
195
156
        }
196
179
        if (_cache_background_lru_dump_thread.joinable()) {
197
156
            _cache_background_lru_dump_thread.join();
198
156
        }
199
179
        if (_cache_background_lru_log_replay_thread.joinable()) {
200
156
            _cache_background_lru_log_replay_thread.join();
201
156
        }
202
179
        if (_cache_background_block_lru_update_thread.joinable()) {
203
156
            _cache_background_block_lru_update_thread.join();
204
156
        }
205
179
        if (_ttl_mgr) {
206
137
            _ttl_mgr.reset();
207
137
        }
208
179
    }
209
210
    /// Restore cache from local filesystem.
211
    Status initialize();
212
213
    /// Cache capacity in bytes.
214
14
    [[nodiscard]] size_t capacity() const { return _capacity; }
215
216
    // try to release all releasable block
217
    // it maybe hang the io/system
218
    size_t try_release();
219
220
151
    [[nodiscard]] const std::string& get_base_path() const { return _cache_base_path; }
221
222
    // Get storage for inspection
223
2
    FileCacheStorage* get_storage() const { return _storage.get(); }
224
225
    /**
226
         * Given an `offset` and `size` representing [offset, offset + size) bytes interval,
227
         * return list of cached non-overlapping non-empty
228
         * file blocks `[block1, ..., blockN]` which intersect with given interval.
229
         *
230
         * blocks in returned list are ordered in ascending order and represent a full contiguous
231
         * interval (no holes). Each block in returned list has state: DOWNLOADED, DOWNLOADING or EMPTY.
232
         *
233
         * As long as pointers to returned file blocks are hold
234
         * it is guaranteed that these file blocks are not removed from cache.
235
         */
236
    FileBlocksHolder get_or_set(const UInt128Wrapper& hash, size_t offset, size_t size,
237
                                CacheContext& context);
238
239
    /**
240
     * record blocks read directly by CachedRemoteFileReader
241
     */
242
    void add_need_update_lru_block(FileBlockSPtr block);
243
244
    /**
245
     * Clear all cached data for this cache instance async
246
     *
247
     * @returns summary message
248
     */
249
    std::string clear_file_cache_async();
250
    struct ClearFileCacheCancelToken {
251
        std::atomic_bool cancelled {false};
252
    };
253
    std::string clear_file_cache_sync(
254
            std::shared_ptr<ClearFileCacheCancelToken> cancel_token = nullptr);
255
#ifdef BE_TEST
256
    // Test-only helper. It bypasses FileBlock holder lifecycle and must not be used
257
    // by production clear paths.
258
    std::string clear_file_cache_directly();
259
#endif
260
261
    /**
262
     * Reset the cache capacity. If the new_capacity is smaller than _capacity, the redundant data will be remove async.
263
     *
264
     * @returns summary message
265
     */
266
    std::string reset_capacity(size_t new_capacity);
267
268
    std::map<size_t, FileBlockSPtr> get_blocks_by_key(const UInt128Wrapper& hash);
269
270
    /// For debug and UT
271
    std::string dump_structure(const UInt128Wrapper& hash);
272
    std::string dump_single_cache_type(const UInt128Wrapper& hash, size_t offset);
273
274
    void dump_lru_queues(bool force);
275
276
    [[nodiscard]] size_t get_used_cache_size(FileCacheType type) const;
277
278
    [[nodiscard]] size_t get_file_blocks_num(FileCacheType type) const;
279
280
    // change the block cache type
281
    void change_cache_type(const UInt128Wrapper& hash, size_t offset, FileCacheType new_type,
282
                           std::lock_guard<std::mutex>& cache_lock);
283
284
    // remove all blocks that belong to the key
285
    void remove_if_cached(const UInt128Wrapper& key);
286
    void remove_if_cached_async(const UInt128Wrapper& key);
287
288
    // Reset the block size and keep FileBlock, LRU queue, and cache counters consistent.
289
    void reset_range(const UInt128Wrapper&, size_t offset, size_t old_size, size_t new_size,
290
                     std::lock_guard<std::mutex>& cache_lock);
291
292
    // get the hotest blocks message by key
293
    // The tuple is composed of <offset, size, cache_type, expiration_time>
294
    [[nodiscard]] std::vector<std::tuple<size_t, size_t, FileCacheType, uint64_t>>
295
    get_hot_blocks_meta(const UInt128Wrapper& hash) const;
296
297
594
    [[nodiscard]] bool get_async_open_success() const { return _async_open_done; }
298
299
    BlockFileCache& operator=(const BlockFileCache&) = delete;
300
    BlockFileCache(const BlockFileCache&) = delete;
301
302
    // try to reserve the new space for the new block if the cache is full
303
    bool try_reserve(const UInt128Wrapper& hash, const CacheContext& context, size_t offset,
304
                     size_t size, std::lock_guard<std::mutex>& cache_lock);
305
306
    /**
307
     * Proactively evict cache blocks to free up space before cache is full.
308
     * 
309
     * This function attempts to evict blocks from both NORMAL and TTL queues to maintain 
310
     * cache size below high watermark. Unlike try_reserve() which blocks until space is freed,
311
     * this function initiates asynchronous eviction in background.
312
     * 
313
     * @param size Number of bytes to try to evict
314
     * @param cache_lock Lock that must be held while accessing cache data structures
315
     * 
316
     * @pre Caller must hold cache_lock
317
     * @pre _need_evict_cache_in_advance must be true
318
     * @pre _recycle_keys queue must have capacity for evicted blocks
319
     */
320
    void try_evict_in_advance(size_t size, std::lock_guard<std::mutex>& cache_lock);
321
322
    void update_ttl_atime(const UInt128Wrapper& hash);
323
324
    void pause_ttl_manager();
325
    void resume_ttl_manager();
326
327
    std::map<std::string, double> get_stats();
328
329
    // for be UTs
330
    std::map<std::string, double> get_stats_unsafe();
331
3
    [[nodiscard]] size_t need_update_lru_blocks_size_unsafe() const {
332
3
        return _need_update_lru_blocks.size();
333
3
    }
334
335
    using AccessRecord =
336
            std::unordered_map<AccessKeyAndOffset, LRUQueue::Iterator, KeyAndOffsetHash>;
337
338
    /// Used to track and control the cache access of each query.
339
    /// Through it, we can realize the processing of different queries by the cache layer.
340
    struct QueryFileCacheContext {
341
        LRUQueue lru_queue;
342
        AccessRecord records;
343
344
5
        QueryFileCacheContext(size_t max_cache_size) : lru_queue(max_cache_size, 0, 0) {}
345
346
        void remove(const UInt128Wrapper& hash, size_t offset,
347
                    std::lock_guard<std::mutex>& cache_lock);
348
349
        void reserve(const UInt128Wrapper& hash, size_t offset, size_t size,
350
                     std::lock_guard<std::mutex>& cache_lock);
351
352
69
        size_t get_max_cache_size() const { return lru_queue.get_max_size(); }
353
354
48
        size_t get_cache_size(std::lock_guard<std::mutex>& cache_lock) const {
355
48
            return lru_queue.get_capacity(cache_lock);
356
48
        }
357
358
50
        LRUQueue& queue() { return lru_queue; }
359
    };
360
361
    using QueryFileCacheContextPtr = std::shared_ptr<QueryFileCacheContext>;
362
    using QueryFileCacheContextMap = std::unordered_map<TUniqueId, QueryFileCacheContextPtr>;
363
364
    QueryFileCacheContextPtr get_query_context(const TUniqueId& query_id,
365
                                               std::lock_guard<std::mutex>&);
366
367
    void remove_query_context(const TUniqueId& query_id);
368
369
    QueryFileCacheContextPtr get_or_set_query_context(const TUniqueId& query_id,
370
                                                      std::lock_guard<std::mutex>& cache_lock,
371
                                                      int file_cache_query_limit_percent);
372
373
    /// Save a query context information, and adopt different cache policies
374
    /// for different queries through the context cache layer.
375
    struct QueryFileCacheContextHolder {
376
        QueryFileCacheContextHolder(const TUniqueId& query_id, BlockFileCache* mgr,
377
                                    QueryFileCacheContextPtr context)
378
7
                : query_id(query_id), mgr(mgr), context(context) {}
379
380
        QueryFileCacheContextHolder& operator=(const QueryFileCacheContextHolder&) = delete;
381
        QueryFileCacheContextHolder(const QueryFileCacheContextHolder&) = delete;
382
383
7
        ~QueryFileCacheContextHolder() {
384
            /// If only the query_map and the current holder hold the context_query,
385
            /// the query has been completed and the query_context is released.
386
7
            if (context) {
387
6
                context.reset();
388
6
                mgr->remove_query_context(query_id);
389
6
            }
390
7
        }
391
392
        const TUniqueId& query_id;
393
        BlockFileCache* mgr = nullptr;
394
        QueryFileCacheContextPtr context;
395
    };
396
    using QueryFileCacheContextHolderPtr = std::unique_ptr<QueryFileCacheContextHolder>;
397
    QueryFileCacheContextHolderPtr get_query_context_holder(const TUniqueId& query_id,
398
                                                            int file_cache_query_limit_percent);
399
400
0
    int64_t approximate_available_cache_size() const {
401
0
        return std::max<int64_t>(
402
0
                _cache_capacity_metrics->get_value() - _cur_cache_size_metrics->get_value(), 0);
403
0
    }
404
405
    Status report_file_cache_inconsistency(std::vector<std::string>& results);
406
    Status check_file_cache_consistency(InconsistencyContext& inconsistency_context);
407
408
private:
409
    struct ClearFileCacheResult {
410
        int64_t num_files_all = 0;
411
        int64_t num_cells_all = 0;
412
        int64_t num_cells_to_delete = 0;
413
        int64_t num_cells_wait_recycle = 0;
414
        int64_t num_recycle_drained = 0;
415
        int64_t wait_deleting_rounds = 0;
416
        int64_t elapsed_ms = 0;
417
        bool cancelled = false;
418
        Status status = Status::OK();
419
420
        std::string to_string(const std::string& path, std::string_view action) const;
421
    };
422
423
    ClearFileCacheResult clear_file_cache_async_impl();
424
    void append_clear_result(ClearFileCacheResult& result, const ClearFileCacheResult& other);
425
    size_t count_deleting_blocks_unlocked(std::lock_guard<std::mutex>& cache_lock) const;
426
    ClearFileCacheResult drain_recycle_keys(
427
            const std::shared_ptr<ClearFileCacheCancelToken>& cancel_token = nullptr);
428
    bool recycle_keys_idle();
429
    bool try_dequeue_recycle_key(FileCacheKey* key);
430
    Status remove_dequeued_recycle_key(const FileCacheKey& key);
431
    void refresh_metrics_unlocked(std::lock_guard<std::mutex>& cache_lock);
432
433
    LRUQueue& get_queue(FileCacheType type);
434
    const LRUQueue& get_queue(FileCacheType type) const;
435
436
    template <class T, class U>
437
        requires IsXLock<T> && IsXLock<U>
438
    void remove(FileBlockSPtr file_block, T& cache_lock, U& segment_lock, bool sync = true);
439
440
    FileBlocks get_impl(const UInt128Wrapper& hash, const CacheContext& context,
441
                        const FileBlock::Range& range, std::lock_guard<std::mutex>& cache_lock);
442
443
    template <class T>
444
        requires IsXLock<T>
445
    FileBlockCell* get_cell(const UInt128Wrapper& hash, size_t offset, T& cache_lock);
446
447
    virtual FileBlockCell* add_cell(const UInt128Wrapper& hash, const CacheContext& context,
448
                                    size_t offset, size_t size, FileBlock::State state,
449
                                    std::lock_guard<std::mutex>& cache_lock);
450
451
    Status initialize_unlocked(std::lock_guard<std::mutex>& cache_lock);
452
453
    void update_block_lru(FileBlockSPtr block, std::lock_guard<std::mutex>& cache_lock);
454
455
    void use_cell(const FileBlockCell& cell, FileBlocks* result, bool not_need_move,
456
                  std::lock_guard<std::mutex>& cache_lock);
457
458
    bool try_reserve_for_lru(const UInt128Wrapper& hash, QueryFileCacheContextPtr query_context,
459
                             const CacheContext& context, size_t offset, size_t size,
460
                             std::lock_guard<std::mutex>& cache_lock,
461
                             bool evict_in_advance = false);
462
463
    bool try_reserve_during_async_load(size_t size, std::lock_guard<std::mutex>& cache_lock);
464
465
    std::vector<FileCacheType> get_other_cache_type(FileCacheType cur_cache_type);
466
    std::vector<FileCacheType> get_other_cache_type_without_ttl(FileCacheType cur_cache_type);
467
468
    bool try_reserve_from_other_queue(FileCacheType cur_cache_type, size_t offset, int64_t cur_time,
469
                                      std::lock_guard<std::mutex>& cache_lock,
470
                                      bool evict_in_advance = false);
471
472
    size_t get_available_cache_size(FileCacheType cache_type) const;
473
474
    FileBlocks split_range_into_cells(const UInt128Wrapper& hash, const CacheContext& context,
475
                                      size_t offset, size_t size, FileBlock::State state,
476
                                      std::lock_guard<std::mutex>& cache_lock);
477
    FileBlocks split_range_into_skip_cache_blocks(const UInt128Wrapper& hash,
478
                                                  const CacheContext& context, size_t offset,
479
                                                  size_t size);
480
481
    std::string dump_structure_unlocked(const UInt128Wrapper& hash,
482
                                        std::lock_guard<std::mutex>& cache_lock);
483
484
    std::string dump_single_cache_type_unlocked(const UInt128Wrapper& hash, size_t offset,
485
                                                std::lock_guard<std::mutex>& cache_lock);
486
487
    void fill_holes_with_empty_file_blocks(FileBlocks& file_blocks, const UInt128Wrapper& hash,
488
                                           const CacheContext& context,
489
                                           const FileBlock::Range& range,
490
                                           std::lock_guard<std::mutex>& cache_lock);
491
    void fill_holes_with_skip_cache_blocks(FileBlocks& file_blocks, const UInt128Wrapper& hash,
492
                                           const CacheContext& context,
493
                                           const FileBlock::Range& range);
494
495
    size_t get_used_cache_size_unlocked(FileCacheType type,
496
                                        std::lock_guard<std::mutex>& cache_lock) const;
497
498
    void check_disk_resource_limit();
499
    void check_need_evict_cache_in_advance();
500
501
    size_t get_available_cache_size_unlocked(FileCacheType type,
502
                                             std::lock_guard<std::mutex>& cache_lock) const;
503
504
    size_t get_file_blocks_num_unlocked(FileCacheType type,
505
                                        std::lock_guard<std::mutex>& cache_lock) const;
506
507
    bool need_to_move(FileCacheType cell_type, FileCacheType query_type) const;
508
509
    void run_background_monitor();
510
    void run_background_gc();
511
    void run_background_lru_log_replay();
512
    void run_background_lru_dump();
513
    void restore_lru_queues_from_disk(std::lock_guard<std::mutex>& cache_lock);
514
    void run_background_evict_in_advance();
515
    void run_background_block_lru_update();
516
517
    bool try_reserve_from_other_queue_by_time_interval(FileCacheType cur_type,
518
                                                       std::vector<FileCacheType> other_cache_types,
519
                                                       size_t size, int64_t cur_time,
520
                                                       std::lock_guard<std::mutex>& cache_lock,
521
                                                       bool evict_in_advance);
522
523
    bool try_reserve_from_other_queue_by_size(FileCacheType cur_type,
524
                                              std::vector<FileCacheType> other_cache_types,
525
                                              size_t size, std::lock_guard<std::mutex>& cache_lock,
526
                                              bool evict_in_advance);
527
528
    bool is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size,
529
                     bool evict_in_advance) const;
530
531
    void remove_file_blocks(std::vector<FileBlockCell*>&, std::lock_guard<std::mutex>&, bool sync,
532
                            std::string& reason);
533
534
    void find_evict_candidates(LRUQueue& queue, size_t size, size_t cur_cache_size,
535
                               size_t& removed_size, std::vector<FileBlockCell*>& to_evict,
536
                               std::lock_guard<std::mutex>& cache_lock, size_t& cur_removed_size,
537
                               bool evict_in_advance);
538
539
    Status check_ofstream_status(std::ofstream& out, std::string& filename);
540
    Status dump_one_lru_entry(std::ofstream& out, std::string& filename, const UInt128Wrapper& hash,
541
                              size_t offset, size_t size);
542
    Status finalize_dump(std::ofstream& out, size_t entry_num, std::string& tmp_filename,
543
                         std::string& final_filename, size_t& file_size);
544
    Status check_ifstream_status(std::ifstream& in, std::string& filename);
545
    Status parse_dump_footer(std::ifstream& in, std::string& filename, size_t& entry_num);
546
    Status parse_one_lru_entry(std::ifstream& in, std::string& filename, UInt128Wrapper& hash,
547
                               size_t& offset, size_t& size);
548
    void remove_lru_dump_files();
549
550
    void clear_need_update_lru_blocks();
551
552
    // info
553
    std::string _cache_base_path;
554
    size_t _capacity = 0;
555
    size_t _max_file_block_size = 0;
556
557
    mutable std::mutex _mutex;
558
    bool _close {false};
559
    std::mutex _close_mtx;
560
    std::condition_variable _close_cv;
561
    std::thread _cache_background_monitor_thread;
562
    std::thread _cache_background_gc_thread;
563
    std::thread _cache_background_evict_in_advance_thread;
564
    std::thread _cache_background_lru_dump_thread;
565
    std::thread _cache_background_lru_log_replay_thread;
566
    std::thread _cache_background_block_lru_update_thread;
567
    std::atomic_bool _async_open_done {false};
568
    // disk space or inode is less than the specified value
569
    bool _disk_resource_limit_mode {false};
570
    bool _need_evict_cache_in_advance {false};
571
    bool _is_initialized {false};
572
    // Guarded by _mutex. Cache misses skip insertion while sync clear is draining old blocks.
573
    bool _clear_file_cache_sync_running {false};
574
575
    // strategy
576
    using FileBlocksByOffset = std::map<size_t, FileBlockCell>;
577
    using CachedFiles = std::unordered_map<UInt128Wrapper, FileBlocksByOffset, KeyHash>;
578
    CachedFiles _files;
579
    QueryFileCacheContextMap _query_map;
580
    size_t _cur_cache_size = 0;
581
    size_t _cur_ttl_size = 0;
582
    std::multimap<uint64_t, UInt128Wrapper> _time_to_key;
583
    std::unordered_map<UInt128Wrapper, uint64_t, KeyHash> _key_to_time;
584
    // The three queues are level queue.
585
    // It means as level1/level2/level3 queue.
586
    // but the level2 is maximum.
587
    // If some datas are importance, we can cache it into index queue
588
    // If some datas are just use once, we can cache it into disposable queue
589
    // The size proportion is [1:17:2].
590
    LRUQueue _index_queue;
591
    LRUQueue _normal_queue;
592
    LRUQueue _disposable_queue;
593
    LRUQueue _ttl_queue;
594
595
    // keys for async remove
596
    RecycleFileCacheKeys _recycle_keys;
597
    std::mutex _clear_mutex;
598
    std::mutex _recycle_keys_mutex;
599
    std::condition_variable _recycle_keys_cv;
600
    size_t _recycle_remove_inflight = 0;
601
602
    std::unique_ptr<LRUQueueRecorder> _lru_recorder;
603
    std::unique_ptr<CacheLRUDumper> _lru_dumper;
604
    std::unique_ptr<BlockFileCacheTtlMgr> _ttl_mgr;
605
606
    // metrics
607
    std::shared_ptr<bvar::Status<size_t>> _cache_capacity_metrics;
608
    std::shared_ptr<bvar::Status<size_t>> _cur_cache_size_metrics;
609
    std::shared_ptr<bvar::Status<size_t>> _cur_ttl_cache_size_metrics;
610
    std::shared_ptr<bvar::Status<size_t>> _cur_ttl_cache_lru_queue_cache_size_metrics;
611
    std::shared_ptr<bvar::Status<size_t>> _cur_ttl_cache_lru_queue_element_count_metrics;
612
    std::shared_ptr<bvar::Status<size_t>> _cur_normal_queue_element_count_metrics;
613
    std::shared_ptr<bvar::Status<size_t>> _cur_normal_queue_cache_size_metrics;
614
    std::shared_ptr<bvar::Status<size_t>> _cur_index_queue_element_count_metrics;
615
    std::shared_ptr<bvar::Status<size_t>> _cur_index_queue_cache_size_metrics;
616
    std::shared_ptr<bvar::Status<size_t>> _cur_disposable_queue_element_count_metrics;
617
    std::shared_ptr<bvar::Status<size_t>> _cur_disposable_queue_cache_size_metrics;
618
    std::array<std::shared_ptr<bvar::Adder<size_t>>, 4> _queue_evict_size_metrics;
619
    std::shared_ptr<bvar::Adder<size_t>> _total_read_size_metrics;
620
    std::shared_ptr<bvar::Adder<size_t>> _total_hit_size_metrics;
621
    std::shared_ptr<bvar::Adder<size_t>> _total_evict_size_metrics;
622
    std::shared_ptr<bvar::Adder<size_t>> _gc_evict_bytes_metrics;
623
    std::shared_ptr<bvar::Adder<size_t>> _gc_evict_count_metrics;
624
    std::shared_ptr<bvar::Adder<size_t>> _evict_by_time_metrics_matrix[4][4];
625
    std::shared_ptr<bvar::Adder<size_t>> _evict_by_size_metrics_matrix[4][4];
626
    std::shared_ptr<bvar::Adder<size_t>> _evict_by_self_lru_metrics_matrix[4];
627
    std::shared_ptr<bvar::Adder<size_t>> _evict_by_try_release;
628
629
    std::shared_ptr<bvar::Window<bvar::Adder<size_t>>> _num_hit_blocks_5m;
630
    std::shared_ptr<bvar::Window<bvar::Adder<size_t>>> _num_read_blocks_5m;
631
    std::shared_ptr<bvar::Window<bvar::Adder<size_t>>> _num_hit_blocks_1h;
632
    std::shared_ptr<bvar::Window<bvar::Adder<size_t>>> _num_read_blocks_1h;
633
634
    std::shared_ptr<bvar::Adder<size_t>> _num_read_blocks;
635
    std::shared_ptr<bvar::Adder<size_t>> _num_hit_blocks;
636
    std::shared_ptr<bvar::Adder<size_t>> _num_removed_blocks;
637
638
    std::shared_ptr<bvar::Adder<size_t>> _no_warmup_num_read_blocks;
639
    std::shared_ptr<bvar::Adder<size_t>> _no_warmup_num_hit_blocks;
640
641
    std::shared_ptr<bvar::Window<bvar::Adder<size_t>>> _no_warmup_num_hit_blocks_5m;
642
    std::shared_ptr<bvar::Window<bvar::Adder<size_t>>> _no_warmup_num_read_blocks_5m;
643
    std::shared_ptr<bvar::Window<bvar::Adder<size_t>>> _no_warmup_num_hit_blocks_1h;
644
    std::shared_ptr<bvar::Window<bvar::Adder<size_t>>> _no_warmup_num_read_blocks_1h;
645
646
    std::shared_ptr<bvar::Status<double>> _hit_ratio;
647
    std::shared_ptr<bvar::Status<double>> _hit_ratio_5m;
648
    std::shared_ptr<bvar::Status<double>> _hit_ratio_1h;
649
    std::shared_ptr<bvar::Status<double>> _no_warmup_hit_ratio;
650
    std::shared_ptr<bvar::Status<double>> _no_warmup_hit_ratio_5m;
651
    std::shared_ptr<bvar::Status<double>> _no_warmup_hit_ratio_1h;
652
    std::shared_ptr<bvar::Status<size_t>> _disk_limit_mode_metrics;
653
    std::shared_ptr<bvar::Status<size_t>> _need_evict_cache_in_advance_metrics;
654
    std::shared_ptr<bvar::Status<size_t>> _meta_store_write_queue_size_metrics;
655
656
    std::shared_ptr<bvar::LatencyRecorder> _cache_lock_wait_time_us;
657
    std::shared_ptr<bvar::LatencyRecorder> _get_or_set_latency_us;
658
    std::shared_ptr<bvar::LatencyRecorder> _storage_sync_remove_latency_us;
659
    std::shared_ptr<bvar::LatencyRecorder> _storage_retry_sync_remove_latency_us;
660
    std::shared_ptr<bvar::LatencyRecorder> _storage_async_remove_latency_us;
661
    std::shared_ptr<bvar::LatencyRecorder> _evict_in_advance_latency_us;
662
    std::shared_ptr<bvar::LatencyRecorder> _recycle_keys_length_recorder;
663
    std::shared_ptr<bvar::LatencyRecorder> _update_lru_blocks_latency_us;
664
    std::shared_ptr<bvar::LatencyRecorder> _need_update_lru_blocks_length_recorder;
665
    std::shared_ptr<bvar::LatencyRecorder> _ttl_gc_latency_us;
666
667
    std::shared_ptr<bvar::LatencyRecorder> _shadow_queue_levenshtein_distance;
668
    // keep _storage last so it will deconstruct first
669
    // otherwise, load_cache_info_into_memory might crash
670
    // coz it will use other members of BlockFileCache
671
    // so join this async load thread first
672
    std::unique_ptr<FileCacheStorage> _storage;
673
    std::shared_ptr<bvar::LatencyRecorder> _lru_dump_latency_us;
674
    std::mutex _dump_lru_queues_mtx;
675
    NeedUpdateLRUBlocks _need_update_lru_blocks;
676
};
677
678
} // namespace doris::io