Coverage Report

Created: 2026-03-12 17:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/page_io.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/page_io.h"
19
20
#include <crc32c/crc32c.h>
21
#include <gen_cpp/segment_v2.pb.h>
22
#include <stdint.h>
23
24
#include <algorithm>
25
#include <cstring>
26
#include <memory>
27
#include <ostream>
28
#include <string>
29
#include <utility>
30
31
#include "cloud/config.h"
32
#include "common/logging.h"
33
#include "cpp/sync_point.h"
34
#include "io/cache/block_file_cache.h"
35
#include "io/cache/block_file_cache_factory.h"
36
#include "io/cache/cached_remote_file_reader.h"
37
#include "io/fs/file_reader.h"
38
#include "io/fs/file_writer.h"
39
#include "runtime/runtime_profile.h"
40
#include "storage/cache/page_cache.h"
41
#include "storage/olap_common.h"
42
#include "storage/segment/encoding_info.h"
43
#include "storage/segment/page_handle.h"
44
#include "util/block_compression.h"
45
#include "util/coding.h"
46
#include "util/concurrency_stats.h"
47
#include "util/faststring.h"
48
49
namespace doris {
50
namespace segment_v2 {
51
#include "common/compile_check_begin.h"
52
53
Status PageIO::compress_page_body(BlockCompressionCodec* codec, double min_space_saving,
54
2.00M
                                  const std::vector<Slice>& body, OwnedSlice* compressed_body) {
55
2.00M
    size_t uncompressed_size = Slice::compute_total_size(body);
56
2.00M
    if (codec != nullptr && !codec->exceed_max_compress_len(uncompressed_size)) {
57
340k
        faststring buf;
58
340k
        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(codec->compress(body, uncompressed_size, &buf));
59
340k
        double space_saving =
60
340k
                1.0 - (cast_set<double>(buf.size()) / cast_set<double>(uncompressed_size));
61
        // return compressed body only when it saves more than min_space_saving
62
340k
        if (space_saving > 0 && space_saving >= min_space_saving) {
63
            // shrink the buf to fit the len size to avoid taking
64
            // up the memory of the size MAX_COMPRESSED_SIZE
65
136k
            RETURN_IF_CATCH_EXCEPTION(*compressed_body = buf.build());
66
136k
            return Status::OK();
67
136k
        }
68
340k
    }
69
    // otherwise, do not compress
70
1.87M
    OwnedSlice empty;
71
1.87M
    *compressed_body = std::move(empty);
72
1.87M
    return Status::OK();
73
2.00M
}
74
75
Status PageIO::write_page(io::FileWriter* writer, const std::vector<Slice>& body,
76
2.06M
                          const PageFooterPB& footer, PagePointer* result) {
77
    // sanity check of page footer
78
18.4E
    CHECK(footer.has_type()) << "type must be set";
79
18.4E
    CHECK(footer.has_uncompressed_size()) << "uncompressed_size must be set";
80
2.06M
    switch (footer.type()) {
81
1.69M
    case DATA_PAGE:
82
1.69M
        CHECK(footer.has_data_page_footer());
83
1.69M
        break;
84
13.1k
    case INDEX_PAGE:
85
13.1k
        CHECK(footer.has_index_page_footer());
86
13.1k
        break;
87
308k
    case DICTIONARY_PAGE:
88
308k
        CHECK(footer.has_dict_page_footer());
89
308k
        break;
90
45.2k
    case SHORT_KEY_PAGE:
91
45.2k
        CHECK(footer.has_short_key_page_footer());
92
45.2k
        break;
93
0
    default:
94
0
        CHECK(false) << "Invalid page footer type: " << footer.type();
95
0
        break;
96
2.06M
    }
97
98
2.06M
    std::string footer_buf; // serialized footer + footer size
99
2.06M
    footer.SerializeToString(&footer_buf);
100
2.06M
    put_fixed32_le(&footer_buf, static_cast<uint32_t>(footer_buf.size()));
101
102
2.06M
    std::vector<Slice> page = body;
103
2.06M
    page.emplace_back(footer_buf);
104
105
    // checksum
106
2.06M
    uint8_t checksum_buf[sizeof(uint32_t)];
107
2.06M
    uint32_t checksum = 0;
108
5.05M
    for (const auto& slice : page) {
109
5.05M
        checksum = crc32c::Extend(checksum, (const uint8_t*)slice.data, slice.size);
110
5.05M
    }
111
2.06M
    encode_fixed32_le(checksum_buf, checksum);
112
2.06M
    page.emplace_back(checksum_buf, sizeof(uint32_t));
113
114
2.06M
    uint64_t offset = writer->bytes_appended();
115
2.06M
    RETURN_IF_ERROR(writer->appendv(&page[0], page.size()));
116
117
2.06M
    result->offset = offset;
118
2.06M
    result->size = cast_set<uint32_t>(writer->bytes_appended() - offset);
119
2.06M
    return Status::OK();
120
2.06M
}
121
122
2
io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) {
123
2
    std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky: npos + 1 == 0
124
2
    return io::BlockFileCache::hash(base);
125
2
}
126
127
1
std::string file_cache_key_str(const std::string& seg_path) {
128
1
    return file_cache_key_from_path(seg_path).to_string();
129
1
}
130
131
Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
132
6.29M
                                         Slice* body, PageFooterPB* footer) {
133
6.29M
    opts.sanity_check();
134
6.29M
    opts.stats->total_pages_num++;
135
136
6.29M
    auto cache = StoragePageCache::instance();
137
6.29M
    PageCacheHandle cache_handle;
138
6.29M
    StoragePageCache::CacheKey cache_key(opts.file_reader->path().native(),
139
6.29M
                                         opts.file_reader->size(), opts.page_pointer.offset);
140
18.4E
    VLOG_DEBUG << fmt::format("Reading page {}:{}:{}", cache_key.fname, cache_key.fsize,
141
18.4E
                              cache_key.offset);
142
6.29M
    if (opts.use_page_cache && cache && cache->lookup(cache_key, &cache_handle, opts.type)) {
143
        // we find page in cache, use it
144
4.91M
        *handle = PageHandle(std::move(cache_handle));
145
4.91M
        opts.stats->cached_pages_num++;
146
        // parse body and footer
147
4.91M
        Slice page_slice = handle->data();
148
4.91M
        uint32_t footer_size = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
149
4.91M
        std::string footer_buf(page_slice.data + page_slice.size - 4 - footer_size, footer_size);
150
4.91M
        if (!footer->ParseFromString(footer_buf)) {
151
0
            return Status::Corruption("Bad page: invalid footer, footer_size={}, file={}",
152
0
                                      footer_size, opts.file_reader->path().native());
153
0
        }
154
4.91M
        *body = Slice(page_slice.data, page_slice.size - 4 - footer_size);
155
        // If read from cache, then should also recorded in uncompressed bytes read counter.
156
4.91M
        opts.stats->uncompressed_bytes_read += body->size;
157
4.91M
        return Status::OK();
158
4.91M
    }
159
160
    // every page contains 4 bytes footer length and 4 bytes checksum
161
1.37M
    const uint32_t page_size = opts.page_pointer.size;
162
1.37M
    if (page_size < 8) {
163
0
        return Status::Corruption("Bad page: too small size ({}), file={}", page_size,
164
0
                                  opts.file_reader->path().native());
165
0
    }
166
167
    // hold compressed page at first, reset to decompressed page later
168
1.37M
    std::unique_ptr<DataPage> page =
169
1.37M
            std::make_unique<DataPage>(page_size, opts.use_page_cache, opts.type);
170
1.37M
    Slice page_slice(page->data(), page_size);
171
1.37M
    {
172
1.37M
        SCOPED_RAW_TIMER(&opts.stats->io_ns);
173
1.37M
        size_t bytes_read = 0;
174
1.37M
        RETURN_IF_ERROR(opts.file_reader->read_at(opts.page_pointer.offset, page_slice, &bytes_read,
175
1.37M
                                                  &opts.io_ctx));
176
1.37M
        DCHECK_EQ(bytes_read, page_size);
177
1.37M
        opts.stats->compressed_bytes_read += page_size;
178
1.37M
    }
179
180
1.37M
    if (opts.verify_checksum) {
181
1.35M
        uint32_t expect = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
182
1.35M
        uint32_t actual = crc32c::Crc32c(page_slice.data, page_slice.size - 4);
183
        // here const_cast is used for testing.
184
1.35M
        InjectionContext ctx = {&actual, const_cast<PageReadOptions*>(&opts)};
185
1.35M
        (void)ctx;
186
1.35M
        TEST_INJECTION_POINT_CALLBACK("PageIO::read_and_decompress_page:crc_failure_inj", &ctx);
187
1.35M
        if (expect != actual) {
188
0
            return Status::Corruption(
189
0
                    "Bad page: checksum mismatch (actual={} vs expect={}), file={}", actual, expect,
190
0
                    opts.file_reader->path().native());
191
0
        }
192
1.35M
    }
193
194
    // remove checksum suffix
195
1.37M
    page_slice.size -= 4;
196
    // parse and set footer
197
1.37M
    uint32_t footer_size = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
198
1.37M
    if (!footer->ParseFromArray(page_slice.data + page_slice.size - 4 - footer_size, footer_size)) {
199
0
        return Status::Corruption("Bad page: invalid footer, footer_size={}, file={}", footer_size,
200
0
                                  opts.file_reader->path().native());
201
0
    }
202
203
1.37M
    auto body_size = cast_set<uint32_t>(page_slice.size - 4 - footer_size);
204
1.37M
    if (body_size != footer->uncompressed_size()) { // need decompress body
205
111k
        if (opts.codec == nullptr) {
206
0
            return Status::Corruption(
207
0
                    "Bad page: page is compressed but codec is NO_COMPRESSION, file={}",
208
0
                    opts.file_reader->path().native());
209
0
        }
210
111k
        SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_decompress);
211
111k
        SCOPED_RAW_TIMER(&opts.stats->decompress_ns);
212
111k
        std::unique_ptr<DataPage> decompressed_page = std::make_unique<DataPage>(
213
111k
                footer->uncompressed_size() + footer_size + 4, opts.use_page_cache, opts.type);
214
215
        // decompress page body
216
111k
        Slice compressed_body(page_slice.data, body_size);
217
111k
        Slice decompressed_body(decompressed_page->data(), footer->uncompressed_size());
218
111k
        RETURN_IF_ERROR(opts.codec->decompress(compressed_body, &decompressed_body));
219
111k
        if (decompressed_body.size != footer->uncompressed_size()) {
220
0
            return Status::Corruption(
221
0
                    "Bad page: record uncompressed size={} vs real decompressed size={}, file={}",
222
0
                    footer->uncompressed_size(), decompressed_body.size,
223
0
                    opts.file_reader->path().native());
224
0
        }
225
        // append footer and footer size
226
111k
        memcpy(decompressed_body.data + decompressed_body.size, page_slice.data + body_size,
227
111k
               footer_size + 4);
228
        // free memory of compressed page
229
111k
        page = std::move(decompressed_page);
230
111k
        page_slice = Slice(page->data(), footer->uncompressed_size() + footer_size + 4);
231
111k
    }
232
233
1.37M
    if (opts.pre_decode) {
234
1.35M
        const auto* encoding_info = opts.encoding_info;
235
1.35M
        if (opts.is_dict_page) {
236
            // for dict page, we need to use encoding_info based on footer->dict_page_footer().encoding()
237
            // to get its pre_decoder
238
285k
            RETURN_IF_ERROR(EncodingInfo::get(FieldType::OLAP_FIELD_TYPE_VARCHAR,
239
285k
                                              footer->dict_page_footer().encoding(), {},
240
285k
                                              &encoding_info));
241
285k
        }
242
1.35M
        if (encoding_info) {
243
1.30M
            auto* pre_decoder = encoding_info->get_data_page_pre_decoder();
244
1.30M
            if (pre_decoder) {
245
851k
                SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_pre_decode);
246
851k
                RETURN_IF_ERROR(pre_decoder->decode(
247
851k
                        &page, &page_slice,
248
851k
                        footer->data_page_footer().nullmap_size() + footer_size + 4,
249
851k
                        opts.use_page_cache, opts.type, opts.file_reader->path().native()));
250
851k
            }
251
1.30M
        }
252
1.35M
    }
253
254
1.37M
    *body = Slice(page_slice.data, page_slice.size - 4 - footer_size);
255
1.37M
    page->reset_size(page_slice.size);
256
    // Uncompressed has 2 meanings: uncompress and decode. The buffer in pagecache maybe
257
    // uncompressed or decoded. So that should update the uncompressed_bytes_read counter
258
    // just before add it to pagecache, it will be consistency with reading data from page cache.
259
1.37M
    opts.stats->uncompressed_bytes_read += body->size;
260
1.37M
    if (opts.use_page_cache && cache) {
261
440k
        SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_insert_page_cache);
262
        // insert this page into cache and return the cache handle
263
440k
        cache->insert(cache_key, page.get(), &cache_handle, opts.type, opts.kept_in_memory);
264
440k
        *handle = PageHandle(std::move(cache_handle));
265
937k
    } else {
266
937k
        *handle = PageHandle(page.get());
267
937k
    }
268
1.37M
    page.release(); // memory now managed by handle
269
1.37M
    return Status::OK();
270
1.37M
}
271
272
Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle,
273
6.27M
                                        Slice* body, PageFooterPB* footer) {
274
    // First try to read with file cache
275
6.27M
    Status st = do_read_and_decompress_page(opts, handle, body, footer);
276
6.29M
    if (!st.is<ErrorCode::CORRUPTION>() || !config::is_cloud_mode()) {
277
6.29M
        return st;
278
6.29M
    }
279
280
18.4E
    auto* cached_file_reader = dynamic_cast<io::CachedRemoteFileReader*>(opts.file_reader);
281
18.4E
    if (cached_file_reader == nullptr) {
282
0
        return st;
283
0
    }
284
285
    // If we get CORRUPTION error and using file cache, clear cache and retry
286
18.4E
    LOG(WARNING) << "Bad page may be read from file cache, need retry."
287
18.4E
                 << " error msg: " << st.msg()
288
18.4E
                 << " file path: " << opts.file_reader->path().native()
289
18.4E
                 << " offset: " << opts.page_pointer.offset;
290
291
    // Remove cache if exists
292
18.4E
    const std::string path = opts.file_reader->path().string();
293
18.4E
    auto file_key = file_cache_key_from_path(path);
294
18.4E
    auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
295
18.4E
    if (file_cache) {
296
0
        file_cache->remove_if_cached(file_key);
297
0
    }
298
299
    // Retry with file cache
300
18.4E
    st = do_read_and_decompress_page(opts, handle, body, footer);
301
18.4E
    if (!st.is<ErrorCode::CORRUPTION>()) {
302
0
        return st;
303
0
    }
304
305
18.4E
    LOG(WARNING) << "Corruption again with retry downloading cache,"
306
18.4E
                 << " error msg: " << st.msg()
307
18.4E
                 << " file path: " << opts.file_reader->path().native()
308
18.4E
                 << " offset: " << opts.page_pointer.offset;
309
310
18.4E
    PageReadOptions new_opts = opts;
311
18.4E
    new_opts.file_reader = cached_file_reader->get_remote_reader();
312
18.4E
    st = do_read_and_decompress_page(new_opts, handle, body, footer);
313
18.4E
    if (!st.ok()) {
314
        LOG(WARNING) << "Corruption again with retry read directly from remote,"
315
0
                     << " error msg: " << st.msg()
316
0
                     << " file path: " << opts.file_reader->path().native()
317
0
                     << " offset: " << opts.page_pointer.offset << " Give up.";
318
0
    }
319
18.4E
    return st;
320
18.4E
}
321
322
#include "common/compile_check_end.h"
323
} // namespace segment_v2
324
} // namespace doris