be/src/storage/segment/page_io.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/segment/page_io.h" |
19 | | |
20 | | #include <crc32c/crc32c.h> |
21 | | #include <gen_cpp/segment_v2.pb.h> |
22 | | #include <stdint.h> |
23 | | |
24 | | #include <algorithm> |
25 | | #include <cstring> |
26 | | #include <memory> |
27 | | #include <ostream> |
28 | | #include <string> |
29 | | #include <utility> |
30 | | |
31 | | #include "cloud/config.h" |
32 | | #include "common/logging.h" |
33 | | #include "cpp/sync_point.h" |
34 | | #include "io/cache/block_file_cache.h" |
35 | | #include "io/cache/block_file_cache_factory.h" |
36 | | #include "io/cache/cached_remote_file_reader.h" |
37 | | #include "io/fs/file_reader.h" |
38 | | #include "io/fs/file_writer.h" |
39 | | #include "runtime/runtime_profile.h" |
40 | | #include "storage/cache/page_cache.h" |
41 | | #include "storage/olap_common.h" |
42 | | #include "storage/segment/encoding_info.h" |
43 | | #include "storage/segment/page_handle.h" |
44 | | #include "util/block_compression.h" |
45 | | #include "util/coding.h" |
46 | | #include "util/concurrency_stats.h" |
47 | | #include "util/faststring.h" |
48 | | |
49 | | namespace doris { |
50 | | namespace segment_v2 { |
51 | | #include "common/compile_check_begin.h" |
52 | | |
53 | | Status PageIO::compress_page_body(BlockCompressionCodec* codec, double min_space_saving, |
54 | 2.00M | const std::vector<Slice>& body, OwnedSlice* compressed_body) { |
55 | 2.00M | size_t uncompressed_size = Slice::compute_total_size(body); |
56 | 2.00M | if (codec != nullptr && !codec->exceed_max_compress_len(uncompressed_size)) { |
57 | 340k | faststring buf; |
58 | 340k | RETURN_IF_ERROR_OR_CATCH_EXCEPTION(codec->compress(body, uncompressed_size, &buf)); |
59 | 340k | double space_saving = |
60 | 340k | 1.0 - (cast_set<double>(buf.size()) / cast_set<double>(uncompressed_size)); |
61 | | // return compressed body only when it saves more than min_space_saving |
62 | 340k | if (space_saving > 0 && space_saving >= min_space_saving) { |
63 | | // shrink the buf to fit the len size to avoid taking |
64 | | // up the memory of the size MAX_COMPRESSED_SIZE |
65 | 136k | RETURN_IF_CATCH_EXCEPTION(*compressed_body = buf.build()); |
66 | 136k | return Status::OK(); |
67 | 136k | } |
68 | 340k | } |
69 | | // otherwise, do not compress |
70 | 1.87M | OwnedSlice empty; |
71 | 1.87M | *compressed_body = std::move(empty); |
72 | 1.87M | return Status::OK(); |
73 | 2.00M | } |
74 | | |
75 | | Status PageIO::write_page(io::FileWriter* writer, const std::vector<Slice>& body, |
76 | 2.06M | const PageFooterPB& footer, PagePointer* result) { |
77 | | // sanity check of page footer |
78 | 18.4E | CHECK(footer.has_type()) << "type must be set"; |
79 | 18.4E | CHECK(footer.has_uncompressed_size()) << "uncompressed_size must be set"; |
80 | 2.06M | switch (footer.type()) { |
81 | 1.69M | case DATA_PAGE: |
82 | 1.69M | CHECK(footer.has_data_page_footer()); |
83 | 1.69M | break; |
84 | 13.1k | case INDEX_PAGE: |
85 | 13.1k | CHECK(footer.has_index_page_footer()); |
86 | 13.1k | break; |
87 | 308k | case DICTIONARY_PAGE: |
88 | 308k | CHECK(footer.has_dict_page_footer()); |
89 | 308k | break; |
90 | 45.2k | case SHORT_KEY_PAGE: |
91 | 45.2k | CHECK(footer.has_short_key_page_footer()); |
92 | 45.2k | break; |
93 | 0 | default: |
94 | 0 | CHECK(false) << "Invalid page footer type: " << footer.type(); |
95 | 0 | break; |
96 | 2.06M | } |
97 | | |
98 | 2.06M | std::string footer_buf; // serialized footer + footer size |
99 | 2.06M | footer.SerializeToString(&footer_buf); |
100 | 2.06M | put_fixed32_le(&footer_buf, static_cast<uint32_t>(footer_buf.size())); |
101 | | |
102 | 2.06M | std::vector<Slice> page = body; |
103 | 2.06M | page.emplace_back(footer_buf); |
104 | | |
105 | | // checksum |
106 | 2.06M | uint8_t checksum_buf[sizeof(uint32_t)]; |
107 | 2.06M | uint32_t checksum = 0; |
108 | 5.05M | for (const auto& slice : page) { |
109 | 5.05M | checksum = crc32c::Extend(checksum, (const uint8_t*)slice.data, slice.size); |
110 | 5.05M | } |
111 | 2.06M | encode_fixed32_le(checksum_buf, checksum); |
112 | 2.06M | page.emplace_back(checksum_buf, sizeof(uint32_t)); |
113 | | |
114 | 2.06M | uint64_t offset = writer->bytes_appended(); |
115 | 2.06M | RETURN_IF_ERROR(writer->appendv(&page[0], page.size())); |
116 | | |
117 | 2.06M | result->offset = offset; |
118 | 2.06M | result->size = cast_set<uint32_t>(writer->bytes_appended() - offset); |
119 | 2.06M | return Status::OK(); |
120 | 2.06M | } |
121 | | |
122 | 2 | io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) { |
123 | 2 | std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky: npos + 1 == 0 |
124 | 2 | return io::BlockFileCache::hash(base); |
125 | 2 | } |
126 | | |
127 | 1 | std::string file_cache_key_str(const std::string& seg_path) { |
128 | 1 | return file_cache_key_from_path(seg_path).to_string(); |
129 | 1 | } |
130 | | |
131 | | Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle, |
132 | 6.29M | Slice* body, PageFooterPB* footer) { |
133 | 6.29M | opts.sanity_check(); |
134 | 6.29M | opts.stats->total_pages_num++; |
135 | | |
136 | 6.29M | auto cache = StoragePageCache::instance(); |
137 | 6.29M | PageCacheHandle cache_handle; |
138 | 6.29M | StoragePageCache::CacheKey cache_key(opts.file_reader->path().native(), |
139 | 6.29M | opts.file_reader->size(), opts.page_pointer.offset); |
140 | 18.4E | VLOG_DEBUG << fmt::format("Reading page {}:{}:{}", cache_key.fname, cache_key.fsize, |
141 | 18.4E | cache_key.offset); |
142 | 6.29M | if (opts.use_page_cache && cache && cache->lookup(cache_key, &cache_handle, opts.type)) { |
143 | | // we find page in cache, use it |
144 | 4.91M | *handle = PageHandle(std::move(cache_handle)); |
145 | 4.91M | opts.stats->cached_pages_num++; |
146 | | // parse body and footer |
147 | 4.91M | Slice page_slice = handle->data(); |
148 | 4.91M | uint32_t footer_size = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4); |
149 | 4.91M | std::string footer_buf(page_slice.data + page_slice.size - 4 - footer_size, footer_size); |
150 | 4.91M | if (!footer->ParseFromString(footer_buf)) { |
151 | 0 | return Status::Corruption("Bad page: invalid footer, footer_size={}, file={}", |
152 | 0 | footer_size, opts.file_reader->path().native()); |
153 | 0 | } |
154 | 4.91M | *body = Slice(page_slice.data, page_slice.size - 4 - footer_size); |
155 | | // If read from cache, then should also recorded in uncompressed bytes read counter. |
156 | 4.91M | opts.stats->uncompressed_bytes_read += body->size; |
157 | 4.91M | return Status::OK(); |
158 | 4.91M | } |
159 | | |
160 | | // every page contains 4 bytes footer length and 4 bytes checksum |
161 | 1.37M | const uint32_t page_size = opts.page_pointer.size; |
162 | 1.37M | if (page_size < 8) { |
163 | 0 | return Status::Corruption("Bad page: too small size ({}), file={}", page_size, |
164 | 0 | opts.file_reader->path().native()); |
165 | 0 | } |
166 | | |
167 | | // hold compressed page at first, reset to decompressed page later |
168 | 1.37M | std::unique_ptr<DataPage> page = |
169 | 1.37M | std::make_unique<DataPage>(page_size, opts.use_page_cache, opts.type); |
170 | 1.37M | Slice page_slice(page->data(), page_size); |
171 | 1.37M | { |
172 | 1.37M | SCOPED_RAW_TIMER(&opts.stats->io_ns); |
173 | 1.37M | size_t bytes_read = 0; |
174 | 1.37M | RETURN_IF_ERROR(opts.file_reader->read_at(opts.page_pointer.offset, page_slice, &bytes_read, |
175 | 1.37M | &opts.io_ctx)); |
176 | 1.37M | DCHECK_EQ(bytes_read, page_size); |
177 | 1.37M | opts.stats->compressed_bytes_read += page_size; |
178 | 1.37M | } |
179 | | |
180 | 1.37M | if (opts.verify_checksum) { |
181 | 1.35M | uint32_t expect = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4); |
182 | 1.35M | uint32_t actual = crc32c::Crc32c(page_slice.data, page_slice.size - 4); |
183 | | // here const_cast is used for testing. |
184 | 1.35M | InjectionContext ctx = {&actual, const_cast<PageReadOptions*>(&opts)}; |
185 | 1.35M | (void)ctx; |
186 | 1.35M | TEST_INJECTION_POINT_CALLBACK("PageIO::read_and_decompress_page:crc_failure_inj", &ctx); |
187 | 1.35M | if (expect != actual) { |
188 | 0 | return Status::Corruption( |
189 | 0 | "Bad page: checksum mismatch (actual={} vs expect={}), file={}", actual, expect, |
190 | 0 | opts.file_reader->path().native()); |
191 | 0 | } |
192 | 1.35M | } |
193 | | |
194 | | // remove checksum suffix |
195 | 1.37M | page_slice.size -= 4; |
196 | | // parse and set footer |
197 | 1.37M | uint32_t footer_size = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4); |
198 | 1.37M | if (!footer->ParseFromArray(page_slice.data + page_slice.size - 4 - footer_size, footer_size)) { |
199 | 0 | return Status::Corruption("Bad page: invalid footer, footer_size={}, file={}", footer_size, |
200 | 0 | opts.file_reader->path().native()); |
201 | 0 | } |
202 | | |
203 | 1.37M | auto body_size = cast_set<uint32_t>(page_slice.size - 4 - footer_size); |
204 | 1.37M | if (body_size != footer->uncompressed_size()) { // need decompress body |
205 | 111k | if (opts.codec == nullptr) { |
206 | 0 | return Status::Corruption( |
207 | 0 | "Bad page: page is compressed but codec is NO_COMPRESSION, file={}", |
208 | 0 | opts.file_reader->path().native()); |
209 | 0 | } |
210 | 111k | SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_decompress); |
211 | 111k | SCOPED_RAW_TIMER(&opts.stats->decompress_ns); |
212 | 111k | std::unique_ptr<DataPage> decompressed_page = std::make_unique<DataPage>( |
213 | 111k | footer->uncompressed_size() + footer_size + 4, opts.use_page_cache, opts.type); |
214 | | |
215 | | // decompress page body |
216 | 111k | Slice compressed_body(page_slice.data, body_size); |
217 | 111k | Slice decompressed_body(decompressed_page->data(), footer->uncompressed_size()); |
218 | 111k | RETURN_IF_ERROR(opts.codec->decompress(compressed_body, &decompressed_body)); |
219 | 111k | if (decompressed_body.size != footer->uncompressed_size()) { |
220 | 0 | return Status::Corruption( |
221 | 0 | "Bad page: record uncompressed size={} vs real decompressed size={}, file={}", |
222 | 0 | footer->uncompressed_size(), decompressed_body.size, |
223 | 0 | opts.file_reader->path().native()); |
224 | 0 | } |
225 | | // append footer and footer size |
226 | 111k | memcpy(decompressed_body.data + decompressed_body.size, page_slice.data + body_size, |
227 | 111k | footer_size + 4); |
228 | | // free memory of compressed page |
229 | 111k | page = std::move(decompressed_page); |
230 | 111k | page_slice = Slice(page->data(), footer->uncompressed_size() + footer_size + 4); |
231 | 111k | } |
232 | | |
233 | 1.37M | if (opts.pre_decode) { |
234 | 1.35M | const auto* encoding_info = opts.encoding_info; |
235 | 1.35M | if (opts.is_dict_page) { |
236 | | // for dict page, we need to use encoding_info based on footer->dict_page_footer().encoding() |
237 | | // to get its pre_decoder |
238 | 285k | RETURN_IF_ERROR(EncodingInfo::get(FieldType::OLAP_FIELD_TYPE_VARCHAR, |
239 | 285k | footer->dict_page_footer().encoding(), {}, |
240 | 285k | &encoding_info)); |
241 | 285k | } |
242 | 1.35M | if (encoding_info) { |
243 | 1.30M | auto* pre_decoder = encoding_info->get_data_page_pre_decoder(); |
244 | 1.30M | if (pre_decoder) { |
245 | 851k | SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_pre_decode); |
246 | 851k | RETURN_IF_ERROR(pre_decoder->decode( |
247 | 851k | &page, &page_slice, |
248 | 851k | footer->data_page_footer().nullmap_size() + footer_size + 4, |
249 | 851k | opts.use_page_cache, opts.type, opts.file_reader->path().native())); |
250 | 851k | } |
251 | 1.30M | } |
252 | 1.35M | } |
253 | | |
254 | 1.37M | *body = Slice(page_slice.data, page_slice.size - 4 - footer_size); |
255 | 1.37M | page->reset_size(page_slice.size); |
256 | | // Uncompressed has 2 meanings: uncompress and decode. The buffer in pagecache maybe |
257 | | // uncompressed or decoded. So that should update the uncompressed_bytes_read counter |
258 | | // just before add it to pagecache, it will be consistency with reading data from page cache. |
259 | 1.37M | opts.stats->uncompressed_bytes_read += body->size; |
260 | 1.37M | if (opts.use_page_cache && cache) { |
261 | 440k | SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_insert_page_cache); |
262 | | // insert this page into cache and return the cache handle |
263 | 440k | cache->insert(cache_key, page.get(), &cache_handle, opts.type, opts.kept_in_memory); |
264 | 440k | *handle = PageHandle(std::move(cache_handle)); |
265 | 937k | } else { |
266 | 937k | *handle = PageHandle(page.get()); |
267 | 937k | } |
268 | 1.37M | page.release(); // memory now managed by handle |
269 | 1.37M | return Status::OK(); |
270 | 1.37M | } |
271 | | |
272 | | Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle, |
273 | 6.27M | Slice* body, PageFooterPB* footer) { |
274 | | // First try to read with file cache |
275 | 6.27M | Status st = do_read_and_decompress_page(opts, handle, body, footer); |
276 | 6.29M | if (!st.is<ErrorCode::CORRUPTION>() || !config::is_cloud_mode()) { |
277 | 6.29M | return st; |
278 | 6.29M | } |
279 | | |
280 | 18.4E | auto* cached_file_reader = dynamic_cast<io::CachedRemoteFileReader*>(opts.file_reader); |
281 | 18.4E | if (cached_file_reader == nullptr) { |
282 | 0 | return st; |
283 | 0 | } |
284 | | |
285 | | // If we get CORRUPTION error and using file cache, clear cache and retry |
286 | 18.4E | LOG(WARNING) << "Bad page may be read from file cache, need retry." |
287 | 18.4E | << " error msg: " << st.msg() |
288 | 18.4E | << " file path: " << opts.file_reader->path().native() |
289 | 18.4E | << " offset: " << opts.page_pointer.offset; |
290 | | |
291 | | // Remove cache if exists |
292 | 18.4E | const std::string path = opts.file_reader->path().string(); |
293 | 18.4E | auto file_key = file_cache_key_from_path(path); |
294 | 18.4E | auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key); |
295 | 18.4E | if (file_cache) { |
296 | 0 | file_cache->remove_if_cached(file_key); |
297 | 0 | } |
298 | | |
299 | | // Retry with file cache |
300 | 18.4E | st = do_read_and_decompress_page(opts, handle, body, footer); |
301 | 18.4E | if (!st.is<ErrorCode::CORRUPTION>()) { |
302 | 0 | return st; |
303 | 0 | } |
304 | | |
305 | 18.4E | LOG(WARNING) << "Corruption again with retry downloading cache," |
306 | 18.4E | << " error msg: " << st.msg() |
307 | 18.4E | << " file path: " << opts.file_reader->path().native() |
308 | 18.4E | << " offset: " << opts.page_pointer.offset; |
309 | | |
310 | 18.4E | PageReadOptions new_opts = opts; |
311 | 18.4E | new_opts.file_reader = cached_file_reader->get_remote_reader(); |
312 | 18.4E | st = do_read_and_decompress_page(new_opts, handle, body, footer); |
313 | 18.4E | if (!st.ok()) { |
314 | | LOG(WARNING) << "Corruption again with retry read directly from remote," |
315 | 0 | << " error msg: " << st.msg() |
316 | 0 | << " file path: " << opts.file_reader->path().native() |
317 | 0 | << " offset: " << opts.page_pointer.offset << " Give up."; |
318 | 0 | } |
319 | 18.4E | return st; |
320 | 18.4E | } |
321 | | |
322 | | #include "common/compile_check_end.h" |
323 | | } // namespace segment_v2 |
324 | | } // namespace doris |