Coverage Report

Created: 2026-03-16 21:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/memory/lru_cache_policy.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <fmt/format.h>
21
22
#include <memory>
23
24
#include "common/be_mock_util.h"
25
#include "runtime/memory/cache_policy.h"
26
#include "runtime/memory/lru_cache_value_base.h"
27
#include "runtime/memory/mem_tracker_limiter.h"
28
#include "runtime/thread_context.h"
29
#include "util/lru_cache.h"
30
#include "util/time.h"
31
32
namespace doris {
33
#include "common/compile_check_begin.h"
34
35
// Base of lru cache, allow prune stale entry and prune all entry.
36
class LRUCachePolicy : public CachePolicy {
37
public:
38
    LRUCachePolicy(CacheType type, size_t capacity, LRUCacheType lru_cache_type,
39
                   uint32_t stale_sweep_time_s, uint32_t num_shards,
40
                   uint32_t element_count_capacity, bool enable_prune, bool is_lru_k)
41
974
            : CachePolicy(type, capacity, stale_sweep_time_s, enable_prune),
42
974
              _lru_cache_type(lru_cache_type) {
43
974
        if (check_capacity(capacity, num_shards)) {
44
954
            _cache = std::shared_ptr<ShardedLRUCache>(
45
954
                    new ShardedLRUCache(type_string(type), capacity, lru_cache_type, num_shards,
46
954
                                        element_count_capacity, is_lru_k));
47
954
        } else {
48
20
            _cache = std::make_shared<doris::DummyLRUCache>();
49
20
        }
50
974
        _init_mem_tracker(lru_cache_type_string(lru_cache_type));
51
974
        CacheManager::instance()->register_cache(this);
52
974
    }
53
54
    LRUCachePolicy(CacheType type, size_t capacity, LRUCacheType lru_cache_type,
55
                   uint32_t stale_sweep_time_s, uint32_t num_shards,
56
                   uint32_t element_count_capacity,
57
                   CacheValueTimeExtractor cache_value_time_extractor,
58
                   bool cache_value_check_timestamp, bool enable_prune, bool is_lru_k)
59
144
            : CachePolicy(type, capacity, stale_sweep_time_s, enable_prune),
60
144
              _lru_cache_type(lru_cache_type) {
61
144
        if (check_capacity(capacity, num_shards)) {
62
117
            _cache = std::shared_ptr<ShardedLRUCache>(
63
117
                    new ShardedLRUCache(type_string(type), capacity, lru_cache_type, num_shards,
64
117
                                        cache_value_time_extractor, cache_value_check_timestamp,
65
117
                                        element_count_capacity, is_lru_k));
66
117
        } else {
67
27
            _cache = std::make_shared<doris::DummyLRUCache>();
68
27
        }
69
144
        _init_mem_tracker(lru_cache_type_string(lru_cache_type));
70
144
        CacheManager::instance()->register_cache(this);
71
144
    }
72
73
0
    void reset_cache() { _cache.reset(); }
74
75
1.11k
    bool check_capacity(size_t capacity, uint32_t num_shards) {
76
1.11k
        if (capacity == 0 || capacity < num_shards) {
77
47
            LOG(INFO) << fmt::format(
78
47
                    "{} lru cache capacity({} B) {} num_shards({}), will be disabled.",
79
47
                    type_string(type()), capacity, capacity == 0 ? "is 0, ignore" : "less than",
80
47
                    num_shards);
81
47
            _enable_prune = false;
82
47
            return false;
83
47
        }
84
1.07k
        return true;
85
1.11k
    }
86
87
1.11k
    static std::string lru_cache_type_string(LRUCacheType type) {
88
1.11k
        switch (type) {
89
255
        case LRUCacheType::SIZE:
90
255
            return "size";
91
863
        case LRUCacheType::NUMBER:
92
863
            return "number";
93
0
        default:
94
0
            throw Exception(
95
0
                    Status::FatalError("not match type of lru cache:{}", static_cast<int>(type)));
96
1.11k
        }
97
1.11k
    }
98
99
11.0k
    std::shared_ptr<MemTrackerLimiter> mem_tracker() const {
100
11.0k
        DCHECK(_mem_tracker != nullptr);
101
11.0k
        return _mem_tracker;
102
11.0k
    }
103
104
28
    int64_t mem_consumption() {
105
28
        DCHECK(_mem_tracker != nullptr);
106
28
        return _mem_tracker->consumption();
107
28
    }
108
109
0
    int64_t value_mem_consumption() {
110
0
        DCHECK(_value_mem_tracker != nullptr);
111
0
        return _value_mem_tracker->consumption();
112
0
    }
113
114
    // Insert will consume tracking_bytes to _mem_tracker and cache value destroy will release tracking_bytes.
115
    // If LRUCacheType::SIZE, value_tracking_bytes usually equal to charge.
116
    // If LRUCacheType::NUMBER, value_tracking_bytes usually not equal to charge, at this time charge is an weight.
117
    // If LRUCacheType::SIZE and value_tracking_bytes equals 0, memory must be tracked in Doris Allocator,
118
    //    cache value is allocated using Alloctor.
119
    // If LRUCacheType::NUMBER and value_tracking_bytes equals 0, usually currently cannot accurately tracking memory size,
120
    //    only tracking handle_size(106).
121
    Cache::Handle* insert(const CacheKey& key, void* value, size_t charge,
122
                          size_t value_tracking_bytes,
123
328k
                          CachePriority priority = CachePriority::NORMAL) {
124
328k
        size_t tracking_bytes = sizeof(LRUHandle) - 1 + key.size() + value_tracking_bytes;
125
328k
        if (value != nullptr) {
126
328k
            ((LRUCacheValueBase*)value)
127
328k
                    ->set_tracking_bytes(tracking_bytes, _mem_tracker, value_tracking_bytes,
128
328k
                                         _value_mem_tracker);
129
328k
        }
130
328k
        return _cache->insert(key, value, charge, priority);
131
328k
    }
132
133
0
    void for_each_entry(const std::function<void(const LRUHandle*)>& visitor) {
134
0
        _cache->for_each_entry(visitor);
135
0
    }
136
137
354k
    Cache::Handle* lookup(const CacheKey& key) { return _cache->lookup(key); }
138
139
634k
    MOCK_FUNCTION void release(Cache::Handle* handle) { _cache->release(handle); }
140
141
315k
    MOCK_FUNCTION void* value(Cache::Handle* handle) { return _cache->value(handle); }
142
143
204
    void erase(const CacheKey& key) { _cache->erase(key); }
144
145
16.0k
    int64_t get_usage() { return _cache->get_usage(); }
146
147
10
    size_t get_element_count() { return _cache->get_element_count(); }
148
149
16.0k
    size_t get_capacity() override { return _cache->get_capacity(); }
150
151
2
    uint64_t new_id() { return _cache->new_id(); };
152
153
    // Subclass can override this method to determine whether to do the minor or full gc
154
0
    virtual bool exceed_prune_limit() {
155
0
        return _lru_cache_type == LRUCacheType::SIZE ? mem_consumption() > CACHE_MIN_PRUNE_SIZE
156
0
                                                     : get_usage() > CACHE_MIN_PRUNE_NUMBER;
157
0
    }
158
159
    // Try to prune the cache if expired.
160
0
    void prune_stale() override {
161
0
        std::lock_guard<std::mutex> l(_lock);
162
0
        COUNTER_SET(_freed_entrys_counter, (int64_t)0);
163
0
        COUNTER_SET(_freed_memory_counter, (int64_t)0);
164
0
        if (_stale_sweep_time_s <= 0 || std::dynamic_pointer_cast<doris::DummyLRUCache>(_cache)) {
165
0
            return;
166
0
        }
167
0
        if (exceed_prune_limit()) {
168
0
            COUNTER_SET(_cost_timer, (int64_t)0);
169
0
            const int64_t curtime = UnixMillis();
170
0
            auto pred = [this, curtime](const LRUHandle* handle) -> bool {
171
0
                return static_cast<bool>((handle->last_visit_time + _stale_sweep_time_s * 1000) <
172
0
                                         curtime);
173
0
            };
174
175
0
            LOG(INFO) << fmt::format("[MemoryGC] {} prune stale start, consumption {}, usage {}",
176
0
                                     type_string(_type), mem_consumption(), get_usage());
177
0
            {
178
0
                SCOPED_TIMER(_cost_timer);
179
                // Prune cache in lazy mode to save cpu and minimize the time holding write lock
180
0
                PrunedInfo pruned_info = _cache->prune_if(pred, true);
181
0
                COUNTER_SET(_freed_entrys_counter, pruned_info.pruned_count);
182
0
                COUNTER_SET(_freed_memory_counter, pruned_info.pruned_size);
183
0
            }
184
0
            COUNTER_UPDATE(_prune_stale_number_counter, 1);
185
0
            LOG(INFO) << fmt::format(
186
0
                    "[MemoryGC] {} prune stale {} entries, {} bytes, cost {}, {} times prune",
187
0
                    type_string(_type), _freed_entrys_counter->value(),
188
0
                    _freed_memory_counter->value(), _cost_timer->value(),
189
0
                    _prune_stale_number_counter->value());
190
0
        } else {
191
0
            if (_lru_cache_type == LRUCacheType::SIZE) {
192
0
                LOG(INFO) << fmt::format(
193
0
                        "[MemoryGC] {} not need prune stale, LRUCacheType::SIZE consumption {} "
194
0
                        "less "
195
0
                        "than CACHE_MIN_PRUNE_SIZE {}",
196
0
                        type_string(_type), mem_consumption(), CACHE_MIN_PRUNE_SIZE);
197
0
            } else if (_lru_cache_type == LRUCacheType::NUMBER) {
198
0
                LOG(INFO) << fmt::format(
199
0
                        "[MemoryGC] {} not need prune stale, LRUCacheType::NUMBER usage {} less "
200
0
                        "than "
201
0
                        "CACHE_MIN_PRUNE_NUMBER {}",
202
0
                        type_string(_type), get_usage(), CACHE_MIN_PRUNE_NUMBER);
203
0
            }
204
0
        }
205
0
    }
206
207
0
    void prune_all(bool force) override {
208
0
        std::lock_guard<std::mutex> l(_lock);
209
0
        COUNTER_SET(_freed_entrys_counter, (int64_t)0);
210
0
        COUNTER_SET(_freed_memory_counter, (int64_t)0);
211
0
        if (std::dynamic_pointer_cast<doris::DummyLRUCache>(_cache)) {
212
0
            return;
213
0
        }
214
0
        if ((force && mem_consumption() != 0) || exceed_prune_limit()) {
215
0
            COUNTER_SET(_cost_timer, (int64_t)0);
216
0
            LOG(INFO) << fmt::format("[MemoryGC] {} prune all start, consumption {}, usage {}",
217
0
                                     type_string(_type), mem_consumption(), get_usage());
218
0
            {
219
0
                SCOPED_TIMER(_cost_timer);
220
0
                PrunedInfo pruned_info = _cache->prune();
221
0
                COUNTER_SET(_freed_entrys_counter, pruned_info.pruned_count);
222
0
                COUNTER_SET(_freed_memory_counter, pruned_info.pruned_size);
223
0
            }
224
0
            COUNTER_UPDATE(_prune_all_number_counter, 1);
225
0
            LOG(INFO) << fmt::format(
226
0
                    "[MemoryGC] {} prune all {} entries, {} bytes, cost {}, {} times prune, is "
227
0
                    "force: {}",
228
0
                    type_string(_type), _freed_entrys_counter->value(),
229
0
                    _freed_memory_counter->value(), _cost_timer->value(),
230
0
                    _prune_all_number_counter->value(), force);
231
0
        } else {
232
0
            if (_lru_cache_type == LRUCacheType::SIZE) {
233
0
                LOG(INFO) << fmt::format(
234
0
                        "[MemoryGC] {} not need prune all, force is {}, LRUCacheType::SIZE "
235
0
                        "consumption {}, "
236
0
                        "CACHE_MIN_PRUNE_SIZE {}",
237
0
                        type_string(_type), force, mem_consumption(), CACHE_MIN_PRUNE_SIZE);
238
0
            } else if (_lru_cache_type == LRUCacheType::NUMBER) {
239
0
                LOG(INFO) << fmt::format(
240
0
                        "[MemoryGC] {} not need prune all, force is {}, LRUCacheType::NUMBER "
241
0
                        "usage {}, CACHE_MIN_PRUNE_NUMBER {}",
242
0
                        type_string(_type), force, get_usage(), CACHE_MIN_PRUNE_NUMBER);
243
0
            }
244
0
        }
245
0
    }
246
247
14
    int64_t adjust_capacity_weighted_unlocked(double adjust_weighted) {
248
14
        auto capacity =
249
14
                static_cast<size_t>(static_cast<double>(_initial_capacity) * adjust_weighted);
250
14
        COUNTER_SET(_freed_entrys_counter, (int64_t)0);
251
14
        COUNTER_SET(_freed_memory_counter, (int64_t)0);
252
14
        COUNTER_SET(_cost_timer, (int64_t)0);
253
14
        if (std::dynamic_pointer_cast<doris::DummyLRUCache>(_cache)) {
254
0
            return 0;
255
0
        }
256
14
        if (!_enable_prune) {
257
0
            LOG(INFO) << "[MemoryGC] " << type_string(_type)
258
0
                      << " cache prune disabled, so could not adjust capacity to free memory";
259
0
            return 0;
260
0
        }
261
14
        size_t old_capacity = get_capacity();
262
14
        int64_t old_mem_consumption = mem_consumption();
263
14
        int64_t old_usage = get_usage();
264
14
        {
265
14
            SCOPED_TIMER(_cost_timer);
266
14
            PrunedInfo pruned_info = _cache->set_capacity(capacity);
267
14
            COUNTER_SET(_freed_entrys_counter, pruned_info.pruned_count);
268
14
            COUNTER_SET(_freed_memory_counter, pruned_info.pruned_size);
269
14
        }
270
14
        COUNTER_UPDATE(_adjust_capacity_weighted_number_counter, 1);
271
14
        LOG(INFO) << fmt::format(
272
14
                "[MemoryGC] {} update capacity, old <capacity {}, consumption {}, usage {}>, "
273
14
                "adjust_weighted {}, new <capacity {}, consumption {}, usage {}>, prune {} "
274
14
                "entries, {} bytes, cost {}, {} times prune",
275
14
                type_string(_type), old_capacity, old_mem_consumption, old_usage, adjust_weighted,
276
14
                get_capacity(), mem_consumption(), get_usage(), _freed_entrys_counter->value(),
277
14
                _freed_memory_counter->value(), _cost_timer->value(),
278
14
                _adjust_capacity_weighted_number_counter->value());
279
14
        return _freed_entrys_counter->value();
280
14
    }
281
282
11
    int64_t adjust_capacity_weighted(double adjust_weighted) override {
283
11
        std::lock_guard<std::mutex> l(_lock);
284
11
        return adjust_capacity_weighted_unlocked(adjust_weighted);
285
11
    }
286
287
3
    int64_t reset_initial_capacity(double adjust_weighted) override {
288
3
        DCHECK(adjust_weighted != 0.0); // otherwise initial_capacity will always to be 0.
289
3
        std::lock_guard<std::mutex> l(_lock);
290
3
        int64_t prune_num = adjust_capacity_weighted_unlocked(adjust_weighted);
291
3
        size_t old_capacity = _initial_capacity;
292
3
        _initial_capacity =
293
3
                static_cast<size_t>(static_cast<double>(_initial_capacity) * adjust_weighted);
294
3
        LOG(INFO) << fmt::format(
295
3
                "[MemoryGC] {} reset initial capacity, new capacity {}, old capacity {}, prune num "
296
3
                "{}",
297
3
                type_string(_type), _initial_capacity, old_capacity, prune_num);
298
3
        return prune_num;
299
3
    };
300
301
protected:
302
1.11k
    void _init_mem_tracker(const std::string& type_name) {
303
1.11k
        if (std::find(CachePolicy::MetadataCache.begin(), CachePolicy::MetadataCache.end(),
304
1.11k
                      _type) == CachePolicy::MetadataCache.end()) {
305
1.11k
            _mem_tracker = MemTrackerLimiter::create_shared(
306
1.11k
                    MemTrackerLimiter::Type::CACHE,
307
1.11k
                    fmt::format("{}[{}]", type_string(_type), type_name));
308
1.11k
        } else {
309
2
            _mem_tracker = MemTrackerLimiter::create_shared(
310
2
                    MemTrackerLimiter::Type::METADATA,
311
2
                    fmt::format("{}[{}]", type_string(_type), type_name));
312
2
        }
313
1.11k
        _value_mem_tracker = std::make_shared<MemTracker>(
314
1.11k
                fmt::format("{}::Value[{}]", type_string(_type), type_name));
315
1.11k
    }
316
317
    // if check_capacity failed, will return dummy lru cache,
318
    // compatible with ShardedLRUCache usage, but will not actually cache.
319
    std::shared_ptr<Cache> _cache;
320
    std::mutex _lock;
321
    LRUCacheType _lru_cache_type;
322
323
    std::shared_ptr<MemTrackerLimiter> _mem_tracker;
324
    std::shared_ptr<MemTracker> _value_mem_tracker;
325
};
326
327
#include "common/compile_check_end.h"
328
} // namespace doris