Coverage Report

Created: 2026-06-08 20:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/shard_mem_cache.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/cache/shard_mem_cache.h"
19
20
#include <bvar/bvar.h>
21
22
#include <cstring>
23
#include <memory>
24
#include <mutex>
25
26
#include "common/logging.h"
27
28
namespace doris::io {
29
30
bvar::Adder<uint64_t> g_mem_file_cache_duplicate_key_replace_num(
31
        "file_cache", "memory_duplicate_key_replace_num");
32
33
6
Status ShardMemHashTable::remove(const FileCacheKey& key) {
34
6
    std::unique_lock<std::shared_mutex> lock(_shard_mt);
35
6
    auto map_key = std::make_pair(key.hash, key.offset);
36
6
    auto iter = _cache_map.find(map_key);
37
6
    if (iter == _cache_map.end()) {
38
0
        LOG_WARNING("key not found in cache map")
39
0
                .tag("hash", key.hash.to_string())
40
0
                .tag("offset", key.offset);
41
0
        return Status::IOError("key not found in in-memory cache map when remove");
42
0
    }
43
6
    _cache_map.erase(iter);
44
6
    return Status::OK();
45
6
}
46
47
0
Status ShardMemHashTable::append(const FileCacheKey& key, const Slice& value) {
48
0
    return appendv(key, &value, 1);
49
0
}
50
51
6.06k
Status ShardMemHashTable::appendv(const FileCacheKey& key, const Slice* values, size_t value_cnt) {
52
6.06k
    std::unique_lock<std::shared_mutex> lock(_shard_mt);
53
54
6.06k
    auto map_key = std::make_pair(key.hash, key.offset);
55
6.06k
    auto iter = _cache_map.find(map_key);
56
6.06k
    size_t total_size = 0;
57
12.1k
    for (size_t idx = 0; idx < value_cnt; ++idx) {
58
6.06k
        total_size += values[idx].size;
59
6.06k
    }
60
6.06k
    if (total_size == 0) {
61
0
        return Status::InvalidArgument("appendv requires non-empty slices");
62
0
    }
63
64
6.06k
    MemBlock mem_block;
65
6.06k
    mem_block.addr = std::shared_ptr<char[]>(new char[total_size], std::default_delete<char[]>());
66
6.06k
    mem_block.size = total_size;
67
6.06k
    DCHECK(mem_block.addr != nullptr);
68
6.06k
    char* dst = mem_block.addr.get();
69
12.1k
    for (size_t idx = 0; idx < value_cnt; ++idx) {
70
6.06k
        memcpy(dst, values[idx].data, values[idx].size);
71
6.06k
        dst += values[idx].size;
72
6.06k
    }
73
6.06k
    if (iter != _cache_map.end()) {
74
0
        g_mem_file_cache_duplicate_key_replace_num << 1;
75
0
        LOG_WARNING("replace duplicate key in in-memory cache map")
76
0
                .tag("hash", key.hash.to_string())
77
0
                .tag("offset", key.offset)
78
0
                .tag("old_size", iter->second.size)
79
0
                .tag("new_size", total_size);
80
        // Replacing the payload keeps existing readers alive while reusing the same key.
81
0
        iter->second = std::move(mem_block);
82
0
        return Status::OK();
83
0
    }
84
85
6.06k
    _cache_map.emplace(map_key, std::move(mem_block));
86
6.06k
    return Status::OK();
87
6.06k
}
88
89
6
Status ShardMemHashTable::append_iobuf(const FileCacheKey& key, const butil::IOBuf& value) {
90
6
    std::unique_lock<std::shared_mutex> lock(_shard_mt);
91
92
6
    auto map_key = std::make_pair(key.hash, key.offset);
93
6
    auto iter = _cache_map.find(map_key);
94
95
6
    const size_t total_size = value.length();
96
6
    if (total_size == 0) {
97
0
        return Status::InvalidArgument("append_iobuf requires non-empty payload");
98
0
    }
99
100
6
    MemBlock mem_block;
101
6
    mem_block.payload.append(value);
102
6
    mem_block.size = total_size;
103
6
    mem_block.use_iobuf = true;
104
6
    if (iter != _cache_map.end()) {
105
0
        g_mem_file_cache_duplicate_key_replace_num << 1;
106
0
        LOG_WARNING("replace duplicate key in in-memory cache map")
107
0
                .tag("hash", key.hash.to_string())
108
0
                .tag("offset", key.offset)
109
0
                .tag("old_size", iter->second.size)
110
0
                .tag("new_size", total_size);
111
0
        iter->second = std::move(mem_block);
112
0
        return Status::OK();
113
0
    }
114
115
6
    _cache_map.emplace(map_key, std::move(mem_block));
116
6
    return Status::OK();
117
6
}
118
119
703k
Status ShardMemHashTable::read(const FileCacheKey& key, size_t value_offset, Slice buffer) {
120
703k
    std::shared_lock<std::shared_mutex> lock(_shard_mt);
121
703k
    auto map_key = std::make_pair(key.hash, key.offset);
122
703k
    auto iter = _cache_map.find(map_key);
123
703k
    if (iter == _cache_map.end()) {
124
0
        LOG_WARNING("key not found in cache map")
125
0
                .tag("hash", key.hash.to_string())
126
0
                .tag("offset", key.offset);
127
0
        return Status::IOError("key not found in in-memory cache map when read");
128
0
    }
129
130
703k
    const auto& mem_block = iter->second;
131
704k
    if (value_offset > mem_block.size || buffer.size > mem_block.size - value_offset) {
132
0
        return Status::InternalError(
133
0
                "read buffer exceeds cached block, key={}, offset={}, request_offset={}, "
134
0
                "request_size={}, cached_size={}",
135
0
                key.hash.to_string(), key.offset, value_offset, buffer.size, mem_block.size);
136
0
    }
137
703k
    if (mem_block.use_iobuf) {
138
1
        const size_t copied = mem_block.payload.copy_to(buffer.data, buffer.size, value_offset);
139
1
        if (copied != buffer.size) {
140
0
            return Status::InternalError(
141
0
                    "short read from iobuf cache block, key={}, offset={}, request_offset={}, "
142
0
                    "request_size={}, actual_size={}",
143
0
                    key.hash.to_string(), key.offset, value_offset, buffer.size, copied);
144
0
        }
145
1
        return Status::OK();
146
1
    }
147
703k
    DCHECK(mem_block.addr != nullptr);
148
703k
    memcpy(buffer.data, mem_block.addr.get() + value_offset, buffer.size);
149
703k
    return Status::OK();
150
703k
}
151
152
Status ShardMemHashTable::read_to_iobuf(const FileCacheKey& key, size_t value_offset,
153
3
                                        size_t bytes_req, butil::IOBuf* out, size_t* bytes_read) {
154
3
    if (out == nullptr || bytes_read == nullptr) {
155
0
        return Status::InvalidArgument("read_to_iobuf requires non-null out and bytes_read");
156
0
    }
157
3
    std::shared_lock<std::shared_mutex> lock(_shard_mt);
158
3
    auto map_key = std::make_pair(key.hash, key.offset);
159
3
    auto iter = _cache_map.find(map_key);
160
3
    if (iter == _cache_map.end()) {
161
0
        LOG_WARNING("key not found in cache map")
162
0
                .tag("hash", key.hash.to_string())
163
0
                .tag("offset", key.offset);
164
0
        return Status::IOError("key not found in in-memory cache map when read_to_iobuf");
165
0
    }
166
3
    const auto& mem_block = iter->second;
167
3
    if (value_offset > mem_block.size || bytes_req > mem_block.size - value_offset) {
168
0
        return Status::InternalError(
169
0
                "read iobuf exceeds cached block, key={}, offset={}, request_offset={}, "
170
0
                "request_size={}, cached_size={}",
171
0
                key.hash.to_string(), key.offset, value_offset, bytes_req, mem_block.size);
172
0
    }
173
3
    if (mem_block.use_iobuf) {
174
1
        const size_t appended = mem_block.payload.append_to(out, bytes_req, value_offset);
175
1
        if (appended != bytes_req) {
176
0
            return Status::InternalError(
177
0
                    "short append_to from iobuf cache block, key={}, offset={}, request_offset={}, "
178
0
                    "request_size={}, actual_size={}",
179
0
                    key.hash.to_string(), key.offset, value_offset, bytes_req, appended);
180
0
        }
181
2
    } else {
182
2
        DCHECK(mem_block.addr != nullptr);
183
2
        out->append(mem_block.addr.get() + value_offset, bytes_req);
184
2
    }
185
3
    *bytes_read = bytes_req;
186
3
    return Status::OK();
187
3
}
188
189
24.5k
Status ShardMemHashTable::clear() {
190
24.5k
    std::unique_lock<std::shared_mutex> lock(_shard_mt);
191
24.5k
    _cache_map.clear();
192
24.5k
    return Status::OK();
193
24.5k
}
194
195
} // namespace doris::io