be/src/storage/segment/page_io.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/segment/page_io.h" |
19 | | |
20 | | #include <crc32c/crc32c.h> |
21 | | #include <gen_cpp/segment_v2.pb.h> |
22 | | #include <stdint.h> |
23 | | |
24 | | #include <algorithm> |
25 | | #include <cstring> |
26 | | #include <memory> |
27 | | #include <ostream> |
28 | | #include <string> |
29 | | #include <utility> |
30 | | |
31 | | #include "cloud/config.h" |
32 | | #include "common/logging.h" |
33 | | #include "cpp/sync_point.h" |
34 | | #include "io/cache/block_file_cache.h" |
35 | | #include "io/cache/block_file_cache_factory.h" |
36 | | #include "io/cache/cached_remote_file_reader.h" |
37 | | #include "io/fs/file_reader.h" |
38 | | #include "io/fs/file_writer.h" |
39 | | #include "runtime/runtime_profile.h" |
40 | | #include "storage/cache/page_cache.h" |
41 | | #include "storage/olap_common.h" |
42 | | #include "storage/segment/encoding_info.h" |
43 | | #include "storage/segment/page_handle.h" |
44 | | #include "util/block_compression.h" |
45 | | #include "util/coding.h" |
46 | | #include "util/concurrency_stats.h" |
47 | | #include "util/faststring.h" |
48 | | |
49 | | namespace doris { |
50 | | namespace segment_v2 { |
51 | | |
52 | | Status PageIO::compress_page_body(BlockCompressionCodec* codec, double min_space_saving, |
53 | 2.01M | const std::vector<Slice>& body, OwnedSlice* compressed_body) { |
54 | 2.01M | size_t uncompressed_size = Slice::compute_total_size(body); |
55 | 2.01M | if (codec != nullptr && !codec->exceed_max_compress_len(uncompressed_size)) { |
56 | 347k | faststring buf; |
57 | 347k | RETURN_IF_ERROR_OR_CATCH_EXCEPTION(codec->compress(body, uncompressed_size, &buf)); |
58 | 347k | double space_saving = |
59 | 347k | 1.0 - (cast_set<double>(buf.size()) / cast_set<double>(uncompressed_size)); |
60 | | // return compressed body only when it saves more than min_space_saving |
61 | 347k | if (space_saving > 0 && space_saving >= min_space_saving) { |
62 | | // shrink the buf to fit the len size to avoid taking |
63 | | // up the memory of the size MAX_COMPRESSED_SIZE |
64 | 154k | RETURN_IF_CATCH_EXCEPTION(*compressed_body = buf.build()); |
65 | 154k | return Status::OK(); |
66 | 154k | } |
67 | 347k | } |
68 | | // otherwise, do not compress |
69 | 1.85M | OwnedSlice empty; |
70 | 1.85M | *compressed_body = std::move(empty); |
71 | 1.85M | return Status::OK(); |
72 | 2.01M | } |
73 | | |
74 | | Status PageIO::write_page(io::FileWriter* writer, const std::vector<Slice>& body, |
75 | 2.06M | const PageFooterPB& footer, PagePointer* result) { |
76 | | // sanity check of page footer |
77 | 18.4E | CHECK(footer.has_type()) << "type must be set"; |
78 | 18.4E | CHECK(footer.has_uncompressed_size()) << "uncompressed_size must be set"; |
79 | 2.06M | switch (footer.type()) { |
80 | 1.70M | case DATA_PAGE: |
81 | 1.70M | CHECK(footer.has_data_page_footer()); |
82 | 1.70M | break; |
83 | 13.8k | case INDEX_PAGE: |
84 | 13.8k | CHECK(footer.has_index_page_footer()); |
85 | 13.8k | break; |
86 | 303k | case DICTIONARY_PAGE: |
87 | 303k | CHECK(footer.has_dict_page_footer()); |
88 | 303k | break; |
89 | 46.5k | case SHORT_KEY_PAGE: |
90 | 46.5k | CHECK(footer.has_short_key_page_footer()); |
91 | 46.5k | break; |
92 | 0 | default: |
93 | 0 | CHECK(false) << "Invalid page footer type: " << footer.type(); |
94 | 0 | break; |
95 | 2.06M | } |
96 | | |
97 | 2.06M | std::string footer_buf; // serialized footer + footer size |
98 | 2.06M | footer.SerializeToString(&footer_buf); |
99 | 2.06M | put_fixed32_le(&footer_buf, static_cast<uint32_t>(footer_buf.size())); |
100 | | |
101 | 2.06M | std::vector<Slice> page = body; |
102 | 2.06M | page.emplace_back(footer_buf); |
103 | | |
104 | | // checksum |
105 | 2.06M | uint8_t checksum_buf[sizeof(uint32_t)]; |
106 | 2.06M | uint32_t checksum = 0; |
107 | 5.05M | for (const auto& slice : page) { |
108 | 5.05M | checksum = crc32c::Extend(checksum, (const uint8_t*)slice.data, slice.size); |
109 | 5.05M | } |
110 | 2.06M | encode_fixed32_le(checksum_buf, checksum); |
111 | 2.06M | page.emplace_back(checksum_buf, sizeof(uint32_t)); |
112 | | |
113 | 2.06M | uint64_t offset = writer->bytes_appended(); |
114 | 2.06M | RETURN_IF_ERROR(writer->appendv(&page[0], page.size())); |
115 | | |
116 | 2.06M | result->offset = offset; |
117 | 2.06M | result->size = cast_set<uint32_t>(writer->bytes_appended() - offset); |
118 | 2.06M | return Status::OK(); |
119 | 2.06M | } |
120 | | |
121 | 5 | io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) { |
122 | 5 | std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky: npos + 1 == 0 |
123 | 5 | return io::BlockFileCache::hash(base); |
124 | 5 | } |
125 | | |
126 | 3 | std::string file_cache_key_str(const std::string& seg_path) { |
127 | 3 | return file_cache_key_from_path(seg_path).to_string(); |
128 | 3 | } |
129 | | |
130 | | Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle, |
131 | 6.60M | Slice* body, PageFooterPB* footer) { |
132 | 6.60M | opts.sanity_check(); |
133 | 6.60M | opts.stats->total_pages_num++; |
134 | | |
135 | 6.60M | auto cache = StoragePageCache::instance(); |
136 | 6.60M | PageCacheHandle cache_handle; |
137 | 6.60M | StoragePageCache::CacheKey cache_key(opts.file_reader->path().native(), |
138 | 6.60M | opts.file_reader->size(), opts.page_pointer.offset); |
139 | 18.4E | VLOG_DEBUG << fmt::format("Reading page {}:{}:{}", cache_key.fname, cache_key.fsize, |
140 | 18.4E | cache_key.offset); |
141 | 6.60M | if (opts.use_page_cache && cache && cache->lookup(cache_key, &cache_handle, opts.type)) { |
142 | | // we find page in cache, use it |
143 | 4.29M | *handle = PageHandle(std::move(cache_handle)); |
144 | 4.29M | opts.stats->cached_pages_num++; |
145 | | // parse body and footer |
146 | 4.29M | Slice page_slice = handle->data(); |
147 | 4.29M | uint32_t footer_size = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4); |
148 | 4.29M | std::string footer_buf(page_slice.data + page_slice.size - 4 - footer_size, footer_size); |
149 | 4.29M | if (!footer->ParseFromString(footer_buf)) { |
150 | 0 | return Status::Corruption("Bad page: invalid footer, footer_size={}, file={}", |
151 | 0 | footer_size, opts.file_reader->path().native()); |
152 | 0 | } |
153 | 4.29M | *body = Slice(page_slice.data, page_slice.size - 4 - footer_size); |
154 | | // If read from cache, then should also recorded in uncompressed bytes read counter. |
155 | 4.29M | opts.stats->uncompressed_bytes_read += body->size; |
156 | 4.29M | return Status::OK(); |
157 | 4.29M | } |
158 | | |
159 | | // every page contains 4 bytes footer length and 4 bytes checksum |
160 | 2.31M | const uint32_t page_size = opts.page_pointer.size; |
161 | 2.31M | if (page_size < 8) { |
162 | 0 | return Status::Corruption("Bad page: too small size ({}), file={}", page_size, |
163 | 0 | opts.file_reader->path().native()); |
164 | 0 | } |
165 | | |
166 | | // hold compressed page at first, reset to decompressed page later |
167 | 2.31M | std::unique_ptr<DataPage> page = |
168 | 2.31M | std::make_unique<DataPage>(page_size, opts.use_page_cache, opts.type); |
169 | 2.31M | Slice page_slice(page->data(), page_size); |
170 | 2.31M | { |
171 | 2.31M | SCOPED_RAW_TIMER(&opts.stats->io_ns); |
172 | 2.31M | size_t bytes_read = 0; |
173 | 2.31M | RETURN_IF_ERROR(opts.file_reader->read_at(opts.page_pointer.offset, page_slice, &bytes_read, |
174 | 2.31M | &opts.io_ctx)); |
175 | 2.31M | DCHECK_EQ(bytes_read, page_size); |
176 | 2.31M | opts.stats->compressed_bytes_read += page_size; |
177 | 2.31M | } |
178 | | |
179 | 2.31M | if (opts.verify_checksum) { |
180 | 2.30M | uint32_t expect = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4); |
181 | 2.30M | uint32_t actual = crc32c::Crc32c(page_slice.data, page_slice.size - 4); |
182 | | // here const_cast is used for testing. |
183 | 2.30M | InjectionContext ctx = {&actual, const_cast<PageReadOptions*>(&opts)}; |
184 | 2.30M | (void)ctx; |
185 | 2.30M | TEST_INJECTION_POINT_CALLBACK("PageIO::read_and_decompress_page:crc_failure_inj", &ctx); |
186 | 2.30M | if (expect != actual) { |
187 | 0 | return Status::Corruption( |
188 | 0 | "Bad page: checksum mismatch (actual={} vs expect={}), file={}", actual, expect, |
189 | 0 | opts.file_reader->path().native()); |
190 | 0 | } |
191 | 2.30M | } |
192 | | |
193 | | // remove checksum suffix |
194 | 2.31M | page_slice.size -= 4; |
195 | | // parse and set footer |
196 | 2.31M | uint32_t footer_size = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4); |
197 | 2.31M | if (!footer->ParseFromArray(page_slice.data + page_slice.size - 4 - footer_size, footer_size)) { |
198 | 0 | return Status::Corruption("Bad page: invalid footer, footer_size={}, file={}", footer_size, |
199 | 0 | opts.file_reader->path().native()); |
200 | 0 | } |
201 | | |
202 | 2.31M | auto body_size = cast_set<uint32_t>(page_slice.size - 4 - footer_size); |
203 | 2.31M | if (body_size != footer->uncompressed_size()) { // need decompress body |
204 | 183k | if (opts.codec == nullptr) { |
205 | 0 | return Status::Corruption( |
206 | 0 | "Bad page: page is compressed but codec is NO_COMPRESSION, file={}", |
207 | 0 | opts.file_reader->path().native()); |
208 | 0 | } |
209 | 183k | SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_decompress); |
210 | 183k | SCOPED_RAW_TIMER(&opts.stats->decompress_ns); |
211 | 183k | std::unique_ptr<DataPage> decompressed_page = std::make_unique<DataPage>( |
212 | 183k | footer->uncompressed_size() + footer_size + 4, opts.use_page_cache, opts.type); |
213 | | |
214 | | // decompress page body |
215 | 183k | Slice compressed_body(page_slice.data, body_size); |
216 | 183k | Slice decompressed_body(decompressed_page->data(), footer->uncompressed_size()); |
217 | 183k | RETURN_IF_ERROR(opts.codec->decompress(compressed_body, &decompressed_body)); |
218 | 183k | if (decompressed_body.size != footer->uncompressed_size()) { |
219 | 0 | return Status::Corruption( |
220 | 0 | "Bad page: record uncompressed size={} vs real decompressed size={}, file={}", |
221 | 0 | footer->uncompressed_size(), decompressed_body.size, |
222 | 0 | opts.file_reader->path().native()); |
223 | 0 | } |
224 | | // append footer and footer size |
225 | 183k | memcpy(decompressed_body.data + decompressed_body.size, page_slice.data + body_size, |
226 | 183k | footer_size + 4); |
227 | | // free memory of compressed page |
228 | 183k | page = std::move(decompressed_page); |
229 | 183k | page_slice = Slice(page->data(), footer->uncompressed_size() + footer_size + 4); |
230 | 183k | } |
231 | | |
232 | 2.31M | if (opts.pre_decode) { |
233 | 2.29M | const auto* encoding_info = opts.encoding_info; |
234 | 2.29M | if (opts.is_dict_page) { |
235 | | // for dict page, we need to use encoding_info based on footer->dict_page_footer().encoding() |
236 | | // to get its pre_decoder |
237 | 373k | RETURN_IF_ERROR(EncodingInfo::get(FieldType::OLAP_FIELD_TYPE_VARCHAR, |
238 | 373k | footer->dict_page_footer().encoding(), {}, |
239 | 373k | &encoding_info)); |
240 | 373k | } |
241 | 2.29M | if (encoding_info) { |
242 | 2.23M | auto* pre_decoder = encoding_info->get_data_page_pre_decoder(); |
243 | 2.23M | if (pre_decoder) { |
244 | 1.55M | SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_pre_decode); |
245 | 1.55M | RETURN_IF_ERROR(pre_decoder->decode( |
246 | 1.55M | &page, &page_slice, |
247 | 1.55M | footer->data_page_footer().nullmap_size() + footer_size + 4, |
248 | 1.55M | opts.use_page_cache, opts.type, opts.file_reader->path().native())); |
249 | 1.55M | } |
250 | 2.23M | } |
251 | 2.29M | } |
252 | | |
253 | 2.31M | *body = Slice(page_slice.data, page_slice.size - 4 - footer_size); |
254 | 2.31M | page->reset_size(page_slice.size); |
255 | | // Uncompressed has 2 meanings: uncompress and decode. The buffer in pagecache maybe |
256 | | // uncompressed or decoded. So that should update the uncompressed_bytes_read counter |
257 | | // just before add it to pagecache, it will be consistency with reading data from page cache. |
258 | 2.31M | opts.stats->uncompressed_bytes_read += body->size; |
259 | 2.31M | if (opts.use_page_cache && cache) { |
260 | 86.5k | SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_insert_page_cache); |
261 | | // insert this page into cache and return the cache handle |
262 | 86.5k | cache->insert(cache_key, page.get(), &cache_handle, opts.type, opts.kept_in_memory); |
263 | 86.5k | *handle = PageHandle(std::move(cache_handle)); |
264 | 2.22M | } else { |
265 | 2.22M | *handle = PageHandle(page.get()); |
266 | 2.22M | } |
267 | 2.31M | page.release(); // memory now managed by handle |
268 | 2.31M | return Status::OK(); |
269 | 2.31M | } |
270 | | |
271 | | Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle, |
272 | 6.59M | Slice* body, PageFooterPB* footer) { |
273 | | // First try to read with file cache |
274 | 6.59M | Status st = do_read_and_decompress_page(opts, handle, body, footer); |
275 | 6.59M | if (!st.is<ErrorCode::CORRUPTION>() || !config::is_cloud_mode()) { |
276 | 6.58M | return st; |
277 | 6.58M | } |
278 | | |
279 | 10.8k | auto* cached_file_reader = dynamic_cast<io::CachedRemoteFileReader*>(opts.file_reader); |
280 | 10.8k | if (cached_file_reader == nullptr) { |
281 | 0 | return st; |
282 | 0 | } |
283 | | |
284 | | // If we get CORRUPTION error and using file cache, clear cache and retry |
285 | 10.8k | LOG(WARNING) << "Bad page may be read from file cache, need retry." |
286 | 10.8k | << " error msg: " << st.msg() |
287 | 10.8k | << " file path: " << opts.file_reader->path().native() |
288 | 10.8k | << " offset: " << opts.page_pointer.offset; |
289 | | |
290 | | // Remove cache if exists |
291 | 10.8k | const std::string path = opts.file_reader->path().string(); |
292 | 10.8k | auto file_key = file_cache_key_from_path(path); |
293 | 10.8k | auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key); |
294 | 10.8k | if (file_cache) { |
295 | 0 | file_cache->remove_if_cached(file_key); |
296 | 0 | } |
297 | | |
298 | | // Retry with file cache |
299 | 10.8k | st = do_read_and_decompress_page(opts, handle, body, footer); |
300 | 10.8k | if (!st.is<ErrorCode::CORRUPTION>()) { |
301 | 0 | return st; |
302 | 0 | } |
303 | | |
304 | 10.8k | LOG(WARNING) << "Corruption again with retry downloading cache," |
305 | 10.8k | << " error msg: " << st.msg() |
306 | 10.8k | << " file path: " << opts.file_reader->path().native() |
307 | 10.8k | << " offset: " << opts.page_pointer.offset; |
308 | | |
309 | 10.8k | PageReadOptions new_opts = opts; |
310 | 10.8k | new_opts.file_reader = cached_file_reader->get_remote_reader(); |
311 | 10.8k | st = do_read_and_decompress_page(new_opts, handle, body, footer); |
312 | 10.8k | if (!st.ok()) { |
313 | | LOG(WARNING) << "Corruption again with retry read directly from remote," |
314 | 0 | << " error msg: " << st.msg() |
315 | 0 | << " file path: " << opts.file_reader->path().native() |
316 | 0 | << " offset: " << opts.page_pointer.offset << " Give up."; |
317 | 0 | } |
318 | 10.8k | return st; |
319 | 10.8k | } |
320 | | |
321 | | } // namespace segment_v2 |
322 | | } // namespace doris |