be/src/io/cache/shard_mem_cache.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/cache/shard_mem_cache.h" |
19 | | |
20 | | #include <bvar/bvar.h> |
21 | | |
22 | | #include <cstring> |
23 | | #include <memory> |
24 | | #include <mutex> |
25 | | |
26 | | #include "common/logging.h" |
27 | | |
28 | | namespace doris::io { |
29 | | |
30 | | bvar::Adder<uint64_t> g_mem_file_cache_duplicate_key_replace_num( |
31 | | "file_cache", "memory_duplicate_key_replace_num"); |
32 | | |
33 | 6 | Status ShardMemHashTable::remove(const FileCacheKey& key) { |
34 | 6 | std::unique_lock<std::shared_mutex> lock(_shard_mt); |
35 | 6 | auto map_key = std::make_pair(key.hash, key.offset); |
36 | 6 | auto iter = _cache_map.find(map_key); |
37 | 6 | if (iter == _cache_map.end()) { |
38 | 0 | LOG_WARNING("key not found in cache map") |
39 | 0 | .tag("hash", key.hash.to_string()) |
40 | 0 | .tag("offset", key.offset); |
41 | 0 | return Status::IOError("key not found in in-memory cache map when remove"); |
42 | 0 | } |
43 | 6 | _cache_map.erase(iter); |
44 | 6 | return Status::OK(); |
45 | 6 | } |
46 | | |
47 | 0 | Status ShardMemHashTable::append(const FileCacheKey& key, const Slice& value) { |
48 | 0 | return appendv(key, &value, 1); |
49 | 0 | } |
50 | | |
51 | 6.06k | Status ShardMemHashTable::appendv(const FileCacheKey& key, const Slice* values, size_t value_cnt) { |
52 | 6.06k | std::unique_lock<std::shared_mutex> lock(_shard_mt); |
53 | | |
54 | 6.06k | auto map_key = std::make_pair(key.hash, key.offset); |
55 | 6.06k | auto iter = _cache_map.find(map_key); |
56 | 6.06k | size_t total_size = 0; |
57 | 12.1k | for (size_t idx = 0; idx < value_cnt; ++idx) { |
58 | 6.06k | total_size += values[idx].size; |
59 | 6.06k | } |
60 | 6.06k | if (total_size == 0) { |
61 | 0 | return Status::InvalidArgument("appendv requires non-empty slices"); |
62 | 0 | } |
63 | | |
64 | 6.06k | MemBlock mem_block; |
65 | 6.06k | mem_block.addr = std::shared_ptr<char[]>(new char[total_size], std::default_delete<char[]>()); |
66 | 6.06k | mem_block.size = total_size; |
67 | 6.06k | DCHECK(mem_block.addr != nullptr); |
68 | 6.06k | char* dst = mem_block.addr.get(); |
69 | 12.1k | for (size_t idx = 0; idx < value_cnt; ++idx) { |
70 | 6.06k | memcpy(dst, values[idx].data, values[idx].size); |
71 | 6.06k | dst += values[idx].size; |
72 | 6.06k | } |
73 | 6.06k | if (iter != _cache_map.end()) { |
74 | 0 | g_mem_file_cache_duplicate_key_replace_num << 1; |
75 | 0 | LOG_WARNING("replace duplicate key in in-memory cache map") |
76 | 0 | .tag("hash", key.hash.to_string()) |
77 | 0 | .tag("offset", key.offset) |
78 | 0 | .tag("old_size", iter->second.size) |
79 | 0 | .tag("new_size", total_size); |
80 | | // Replacing the payload keeps existing readers alive while reusing the same key. |
81 | 0 | iter->second = std::move(mem_block); |
82 | 0 | return Status::OK(); |
83 | 0 | } |
84 | | |
85 | 6.06k | _cache_map.emplace(map_key, std::move(mem_block)); |
86 | 6.06k | return Status::OK(); |
87 | 6.06k | } |
88 | | |
89 | 6 | Status ShardMemHashTable::append_iobuf(const FileCacheKey& key, const butil::IOBuf& value) { |
90 | 6 | std::unique_lock<std::shared_mutex> lock(_shard_mt); |
91 | | |
92 | 6 | auto map_key = std::make_pair(key.hash, key.offset); |
93 | 6 | auto iter = _cache_map.find(map_key); |
94 | | |
95 | 6 | const size_t total_size = value.length(); |
96 | 6 | if (total_size == 0) { |
97 | 0 | return Status::InvalidArgument("append_iobuf requires non-empty payload"); |
98 | 0 | } |
99 | | |
100 | 6 | MemBlock mem_block; |
101 | 6 | mem_block.payload.append(value); |
102 | 6 | mem_block.size = total_size; |
103 | 6 | mem_block.use_iobuf = true; |
104 | 6 | if (iter != _cache_map.end()) { |
105 | 0 | g_mem_file_cache_duplicate_key_replace_num << 1; |
106 | 0 | LOG_WARNING("replace duplicate key in in-memory cache map") |
107 | 0 | .tag("hash", key.hash.to_string()) |
108 | 0 | .tag("offset", key.offset) |
109 | 0 | .tag("old_size", iter->second.size) |
110 | 0 | .tag("new_size", total_size); |
111 | 0 | iter->second = std::move(mem_block); |
112 | 0 | return Status::OK(); |
113 | 0 | } |
114 | | |
115 | 6 | _cache_map.emplace(map_key, std::move(mem_block)); |
116 | 6 | return Status::OK(); |
117 | 6 | } |
118 | | |
119 | 703k | Status ShardMemHashTable::read(const FileCacheKey& key, size_t value_offset, Slice buffer) { |
120 | 703k | std::shared_lock<std::shared_mutex> lock(_shard_mt); |
121 | 703k | auto map_key = std::make_pair(key.hash, key.offset); |
122 | 703k | auto iter = _cache_map.find(map_key); |
123 | 703k | if (iter == _cache_map.end()) { |
124 | 0 | LOG_WARNING("key not found in cache map") |
125 | 0 | .tag("hash", key.hash.to_string()) |
126 | 0 | .tag("offset", key.offset); |
127 | 0 | return Status::IOError("key not found in in-memory cache map when read"); |
128 | 0 | } |
129 | | |
130 | 703k | const auto& mem_block = iter->second; |
131 | 704k | if (value_offset > mem_block.size || buffer.size > mem_block.size - value_offset) { |
132 | 0 | return Status::InternalError( |
133 | 0 | "read buffer exceeds cached block, key={}, offset={}, request_offset={}, " |
134 | 0 | "request_size={}, cached_size={}", |
135 | 0 | key.hash.to_string(), key.offset, value_offset, buffer.size, mem_block.size); |
136 | 0 | } |
137 | 703k | if (mem_block.use_iobuf) { |
138 | 1 | const size_t copied = mem_block.payload.copy_to(buffer.data, buffer.size, value_offset); |
139 | 1 | if (copied != buffer.size) { |
140 | 0 | return Status::InternalError( |
141 | 0 | "short read from iobuf cache block, key={}, offset={}, request_offset={}, " |
142 | 0 | "request_size={}, actual_size={}", |
143 | 0 | key.hash.to_string(), key.offset, value_offset, buffer.size, copied); |
144 | 0 | } |
145 | 1 | return Status::OK(); |
146 | 1 | } |
147 | 703k | DCHECK(mem_block.addr != nullptr); |
148 | 703k | memcpy(buffer.data, mem_block.addr.get() + value_offset, buffer.size); |
149 | 703k | return Status::OK(); |
150 | 703k | } |
151 | | |
152 | | Status ShardMemHashTable::read_to_iobuf(const FileCacheKey& key, size_t value_offset, |
153 | 3 | size_t bytes_req, butil::IOBuf* out, size_t* bytes_read) { |
154 | 3 | if (out == nullptr || bytes_read == nullptr) { |
155 | 0 | return Status::InvalidArgument("read_to_iobuf requires non-null out and bytes_read"); |
156 | 0 | } |
157 | 3 | std::shared_lock<std::shared_mutex> lock(_shard_mt); |
158 | 3 | auto map_key = std::make_pair(key.hash, key.offset); |
159 | 3 | auto iter = _cache_map.find(map_key); |
160 | 3 | if (iter == _cache_map.end()) { |
161 | 0 | LOG_WARNING("key not found in cache map") |
162 | 0 | .tag("hash", key.hash.to_string()) |
163 | 0 | .tag("offset", key.offset); |
164 | 0 | return Status::IOError("key not found in in-memory cache map when read_to_iobuf"); |
165 | 0 | } |
166 | 3 | const auto& mem_block = iter->second; |
167 | 3 | if (value_offset > mem_block.size || bytes_req > mem_block.size - value_offset) { |
168 | 0 | return Status::InternalError( |
169 | 0 | "read iobuf exceeds cached block, key={}, offset={}, request_offset={}, " |
170 | 0 | "request_size={}, cached_size={}", |
171 | 0 | key.hash.to_string(), key.offset, value_offset, bytes_req, mem_block.size); |
172 | 0 | } |
173 | 3 | if (mem_block.use_iobuf) { |
174 | 1 | const size_t appended = mem_block.payload.append_to(out, bytes_req, value_offset); |
175 | 1 | if (appended != bytes_req) { |
176 | 0 | return Status::InternalError( |
177 | 0 | "short append_to from iobuf cache block, key={}, offset={}, request_offset={}, " |
178 | 0 | "request_size={}, actual_size={}", |
179 | 0 | key.hash.to_string(), key.offset, value_offset, bytes_req, appended); |
180 | 0 | } |
181 | 2 | } else { |
182 | 2 | DCHECK(mem_block.addr != nullptr); |
183 | 2 | out->append(mem_block.addr.get() + value_offset, bytes_req); |
184 | 2 | } |
185 | 3 | *bytes_read = bytes_req; |
186 | 3 | return Status::OK(); |
187 | 3 | } |
188 | | |
189 | 24.5k | Status ShardMemHashTable::clear() { |
190 | 24.5k | std::unique_lock<std::shared_mutex> lock(_shard_mt); |
191 | 24.5k | _cache_map.clear(); |
192 | 24.5k | return Status::OK(); |
193 | 24.5k | } |
194 | | |
195 | | } // namespace doris::io |