Coverage Report

Created: 2026-04-10 18:35

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/page_io.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/page_io.h"
19
20
#include <crc32c/crc32c.h>
21
#include <gen_cpp/segment_v2.pb.h>
22
#include <stdint.h>
23
24
#include <algorithm>
25
#include <cstring>
26
#include <memory>
27
#include <ostream>
28
#include <string>
29
#include <utility>
30
31
#include "cloud/config.h"
32
#include "common/logging.h"
33
#include "cpp/sync_point.h"
34
#include "io/cache/block_file_cache.h"
35
#include "io/cache/block_file_cache_factory.h"
36
#include "io/cache/cached_remote_file_reader.h"
37
#include "io/fs/file_reader.h"
38
#include "io/fs/file_writer.h"
39
#include "runtime/runtime_profile.h"
40
#include "storage/cache/page_cache.h"
41
#include "storage/olap_common.h"
42
#include "storage/segment/encoding_info.h"
43
#include "storage/segment/page_handle.h"
44
#include "util/block_compression.h"
45
#include "util/coding.h"
46
#include "util/concurrency_stats.h"
47
#include "util/faststring.h"
48
49
namespace doris {
50
namespace segment_v2 {
51
52
Status PageIO::compress_page_body(BlockCompressionCodec* codec, double min_space_saving,
53
2.01M
                                  const std::vector<Slice>& body, OwnedSlice* compressed_body) {
54
2.01M
    size_t uncompressed_size = Slice::compute_total_size(body);
55
2.01M
    if (codec != nullptr && !codec->exceed_max_compress_len(uncompressed_size)) {
56
347k
        faststring buf;
57
347k
        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(codec->compress(body, uncompressed_size, &buf));
58
347k
        double space_saving =
59
347k
                1.0 - (cast_set<double>(buf.size()) / cast_set<double>(uncompressed_size));
60
        // return compressed body only when it saves more than min_space_saving
61
347k
        if (space_saving > 0 && space_saving >= min_space_saving) {
62
            // shrink the buf to fit the len size to avoid taking
63
            // up the memory of the size MAX_COMPRESSED_SIZE
64
154k
            RETURN_IF_CATCH_EXCEPTION(*compressed_body = buf.build());
65
154k
            return Status::OK();
66
154k
        }
67
347k
    }
68
    // otherwise, do not compress
69
1.85M
    OwnedSlice empty;
70
1.85M
    *compressed_body = std::move(empty);
71
1.85M
    return Status::OK();
72
2.01M
}
73
74
Status PageIO::write_page(io::FileWriter* writer, const std::vector<Slice>& body,
75
2.06M
                          const PageFooterPB& footer, PagePointer* result) {
76
    // sanity check of page footer
77
18.4E
    CHECK(footer.has_type()) << "type must be set";
78
18.4E
    CHECK(footer.has_uncompressed_size()) << "uncompressed_size must be set";
79
2.06M
    switch (footer.type()) {
80
1.70M
    case DATA_PAGE:
81
1.70M
        CHECK(footer.has_data_page_footer());
82
1.70M
        break;
83
13.8k
    case INDEX_PAGE:
84
13.8k
        CHECK(footer.has_index_page_footer());
85
13.8k
        break;
86
303k
    case DICTIONARY_PAGE:
87
303k
        CHECK(footer.has_dict_page_footer());
88
303k
        break;
89
46.5k
    case SHORT_KEY_PAGE:
90
46.5k
        CHECK(footer.has_short_key_page_footer());
91
46.5k
        break;
92
0
    default:
93
0
        CHECK(false) << "Invalid page footer type: " << footer.type();
94
0
        break;
95
2.06M
    }
96
97
2.06M
    std::string footer_buf; // serialized footer + footer size
98
2.06M
    footer.SerializeToString(&footer_buf);
99
2.06M
    put_fixed32_le(&footer_buf, static_cast<uint32_t>(footer_buf.size()));
100
101
2.06M
    std::vector<Slice> page = body;
102
2.06M
    page.emplace_back(footer_buf);
103
104
    // checksum
105
2.06M
    uint8_t checksum_buf[sizeof(uint32_t)];
106
2.06M
    uint32_t checksum = 0;
107
5.05M
    for (const auto& slice : page) {
108
5.05M
        checksum = crc32c::Extend(checksum, (const uint8_t*)slice.data, slice.size);
109
5.05M
    }
110
2.06M
    encode_fixed32_le(checksum_buf, checksum);
111
2.06M
    page.emplace_back(checksum_buf, sizeof(uint32_t));
112
113
2.06M
    uint64_t offset = writer->bytes_appended();
114
2.06M
    RETURN_IF_ERROR(writer->appendv(&page[0], page.size()));
115
116
2.06M
    result->offset = offset;
117
2.06M
    result->size = cast_set<uint32_t>(writer->bytes_appended() - offset);
118
2.06M
    return Status::OK();
119
2.06M
}
120
121
5
io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) {
122
5
    std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky: npos + 1 == 0
123
5
    return io::BlockFileCache::hash(base);
124
5
}
125
126
3
std::string file_cache_key_str(const std::string& seg_path) {
127
3
    return file_cache_key_from_path(seg_path).to_string();
128
3
}
129
130
Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
131
6.60M
                                         Slice* body, PageFooterPB* footer) {
132
6.60M
    opts.sanity_check();
133
6.60M
    opts.stats->total_pages_num++;
134
135
6.60M
    auto cache = StoragePageCache::instance();
136
6.60M
    PageCacheHandle cache_handle;
137
6.60M
    StoragePageCache::CacheKey cache_key(opts.file_reader->path().native(),
138
6.60M
                                         opts.file_reader->size(), opts.page_pointer.offset);
139
18.4E
    VLOG_DEBUG << fmt::format("Reading page {}:{}:{}", cache_key.fname, cache_key.fsize,
140
18.4E
                              cache_key.offset);
141
6.60M
    if (opts.use_page_cache && cache && cache->lookup(cache_key, &cache_handle, opts.type)) {
142
        // we find page in cache, use it
143
4.29M
        *handle = PageHandle(std::move(cache_handle));
144
4.29M
        opts.stats->cached_pages_num++;
145
        // parse body and footer
146
4.29M
        Slice page_slice = handle->data();
147
4.29M
        uint32_t footer_size = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
148
4.29M
        std::string footer_buf(page_slice.data + page_slice.size - 4 - footer_size, footer_size);
149
4.29M
        if (!footer->ParseFromString(footer_buf)) {
150
0
            return Status::Corruption("Bad page: invalid footer, footer_size={}, file={}",
151
0
                                      footer_size, opts.file_reader->path().native());
152
0
        }
153
4.29M
        *body = Slice(page_slice.data, page_slice.size - 4 - footer_size);
154
        // If read from cache, then should also recorded in uncompressed bytes read counter.
155
4.29M
        opts.stats->uncompressed_bytes_read += body->size;
156
4.29M
        return Status::OK();
157
4.29M
    }
158
159
    // every page contains 4 bytes footer length and 4 bytes checksum
160
2.31M
    const uint32_t page_size = opts.page_pointer.size;
161
2.31M
    if (page_size < 8) {
162
0
        return Status::Corruption("Bad page: too small size ({}), file={}", page_size,
163
0
                                  opts.file_reader->path().native());
164
0
    }
165
166
    // hold compressed page at first, reset to decompressed page later
167
2.31M
    std::unique_ptr<DataPage> page =
168
2.31M
            std::make_unique<DataPage>(page_size, opts.use_page_cache, opts.type);
169
2.31M
    Slice page_slice(page->data(), page_size);
170
2.31M
    {
171
2.31M
        SCOPED_RAW_TIMER(&opts.stats->io_ns);
172
2.31M
        size_t bytes_read = 0;
173
2.31M
        RETURN_IF_ERROR(opts.file_reader->read_at(opts.page_pointer.offset, page_slice, &bytes_read,
174
2.31M
                                                  &opts.io_ctx));
175
2.31M
        DCHECK_EQ(bytes_read, page_size);
176
2.31M
        opts.stats->compressed_bytes_read += page_size;
177
2.31M
    }
178
179
2.31M
    if (opts.verify_checksum) {
180
2.30M
        uint32_t expect = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
181
2.30M
        uint32_t actual = crc32c::Crc32c(page_slice.data, page_slice.size - 4);
182
        // here const_cast is used for testing.
183
2.30M
        InjectionContext ctx = {&actual, const_cast<PageReadOptions*>(&opts)};
184
2.30M
        (void)ctx;
185
2.30M
        TEST_INJECTION_POINT_CALLBACK("PageIO::read_and_decompress_page:crc_failure_inj", &ctx);
186
2.30M
        if (expect != actual) {
187
0
            return Status::Corruption(
188
0
                    "Bad page: checksum mismatch (actual={} vs expect={}), file={}", actual, expect,
189
0
                    opts.file_reader->path().native());
190
0
        }
191
2.30M
    }
192
193
    // remove checksum suffix
194
2.31M
    page_slice.size -= 4;
195
    // parse and set footer
196
2.31M
    uint32_t footer_size = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
197
2.31M
    if (!footer->ParseFromArray(page_slice.data + page_slice.size - 4 - footer_size, footer_size)) {
198
0
        return Status::Corruption("Bad page: invalid footer, footer_size={}, file={}", footer_size,
199
0
                                  opts.file_reader->path().native());
200
0
    }
201
202
2.31M
    auto body_size = cast_set<uint32_t>(page_slice.size - 4 - footer_size);
203
2.31M
    if (body_size != footer->uncompressed_size()) { // need decompress body
204
183k
        if (opts.codec == nullptr) {
205
0
            return Status::Corruption(
206
0
                    "Bad page: page is compressed but codec is NO_COMPRESSION, file={}",
207
0
                    opts.file_reader->path().native());
208
0
        }
209
183k
        SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_decompress);
210
183k
        SCOPED_RAW_TIMER(&opts.stats->decompress_ns);
211
183k
        std::unique_ptr<DataPage> decompressed_page = std::make_unique<DataPage>(
212
183k
                footer->uncompressed_size() + footer_size + 4, opts.use_page_cache, opts.type);
213
214
        // decompress page body
215
183k
        Slice compressed_body(page_slice.data, body_size);
216
183k
        Slice decompressed_body(decompressed_page->data(), footer->uncompressed_size());
217
183k
        RETURN_IF_ERROR(opts.codec->decompress(compressed_body, &decompressed_body));
218
183k
        if (decompressed_body.size != footer->uncompressed_size()) {
219
0
            return Status::Corruption(
220
0
                    "Bad page: record uncompressed size={} vs real decompressed size={}, file={}",
221
0
                    footer->uncompressed_size(), decompressed_body.size,
222
0
                    opts.file_reader->path().native());
223
0
        }
224
        // append footer and footer size
225
183k
        memcpy(decompressed_body.data + decompressed_body.size, page_slice.data + body_size,
226
183k
               footer_size + 4);
227
        // free memory of compressed page
228
183k
        page = std::move(decompressed_page);
229
183k
        page_slice = Slice(page->data(), footer->uncompressed_size() + footer_size + 4);
230
183k
    }
231
232
2.31M
    if (opts.pre_decode) {
233
2.29M
        const auto* encoding_info = opts.encoding_info;
234
2.29M
        if (opts.is_dict_page) {
235
            // for dict page, we need to use encoding_info based on footer->dict_page_footer().encoding()
236
            // to get its pre_decoder
237
373k
            RETURN_IF_ERROR(EncodingInfo::get(FieldType::OLAP_FIELD_TYPE_VARCHAR,
238
373k
                                              footer->dict_page_footer().encoding(), {},
239
373k
                                              &encoding_info));
240
373k
        }
241
2.29M
        if (encoding_info) {
242
2.23M
            auto* pre_decoder = encoding_info->get_data_page_pre_decoder();
243
2.23M
            if (pre_decoder) {
244
1.55M
                SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_pre_decode);
245
1.55M
                RETURN_IF_ERROR(pre_decoder->decode(
246
1.55M
                        &page, &page_slice,
247
1.55M
                        footer->data_page_footer().nullmap_size() + footer_size + 4,
248
1.55M
                        opts.use_page_cache, opts.type, opts.file_reader->path().native()));
249
1.55M
            }
250
2.23M
        }
251
2.29M
    }
252
253
2.31M
    *body = Slice(page_slice.data, page_slice.size - 4 - footer_size);
254
2.31M
    page->reset_size(page_slice.size);
255
    // Uncompressed has 2 meanings: uncompress and decode. The buffer in pagecache maybe
256
    // uncompressed or decoded. So that should update the uncompressed_bytes_read counter
257
    // just before add it to pagecache, it will be consistency with reading data from page cache.
258
2.31M
    opts.stats->uncompressed_bytes_read += body->size;
259
2.31M
    if (opts.use_page_cache && cache) {
260
86.5k
        SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_insert_page_cache);
261
        // insert this page into cache and return the cache handle
262
86.5k
        cache->insert(cache_key, page.get(), &cache_handle, opts.type, opts.kept_in_memory);
263
86.5k
        *handle = PageHandle(std::move(cache_handle));
264
2.22M
    } else {
265
2.22M
        *handle = PageHandle(page.get());
266
2.22M
    }
267
2.31M
    page.release(); // memory now managed by handle
268
2.31M
    return Status::OK();
269
2.31M
}
270
271
Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle,
272
6.59M
                                        Slice* body, PageFooterPB* footer) {
273
    // First try to read with file cache
274
6.59M
    Status st = do_read_and_decompress_page(opts, handle, body, footer);
275
6.59M
    if (!st.is<ErrorCode::CORRUPTION>() || !config::is_cloud_mode()) {
276
6.58M
        return st;
277
6.58M
    }
278
279
10.8k
    auto* cached_file_reader = dynamic_cast<io::CachedRemoteFileReader*>(opts.file_reader);
280
10.8k
    if (cached_file_reader == nullptr) {
281
0
        return st;
282
0
    }
283
284
    // If we get CORRUPTION error and using file cache, clear cache and retry
285
10.8k
    LOG(WARNING) << "Bad page may be read from file cache, need retry."
286
10.8k
                 << " error msg: " << st.msg()
287
10.8k
                 << " file path: " << opts.file_reader->path().native()
288
10.8k
                 << " offset: " << opts.page_pointer.offset;
289
290
    // Remove cache if exists
291
10.8k
    const std::string path = opts.file_reader->path().string();
292
10.8k
    auto file_key = file_cache_key_from_path(path);
293
10.8k
    auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
294
10.8k
    if (file_cache) {
295
0
        file_cache->remove_if_cached(file_key);
296
0
    }
297
298
    // Retry with file cache
299
10.8k
    st = do_read_and_decompress_page(opts, handle, body, footer);
300
10.8k
    if (!st.is<ErrorCode::CORRUPTION>()) {
301
0
        return st;
302
0
    }
303
304
10.8k
    LOG(WARNING) << "Corruption again with retry downloading cache,"
305
10.8k
                 << " error msg: " << st.msg()
306
10.8k
                 << " file path: " << opts.file_reader->path().native()
307
10.8k
                 << " offset: " << opts.page_pointer.offset;
308
309
10.8k
    PageReadOptions new_opts = opts;
310
10.8k
    new_opts.file_reader = cached_file_reader->get_remote_reader();
311
10.8k
    st = do_read_and_decompress_page(new_opts, handle, body, footer);
312
10.8k
    if (!st.ok()) {
313
        LOG(WARNING) << "Corruption again with retry read directly from remote,"
314
0
                     << " error msg: " << st.msg()
315
0
                     << " file path: " << opts.file_reader->path().native()
316
0
                     << " offset: " << opts.page_pointer.offset << " Give up.";
317
0
    }
318
10.8k
    return st;
319
10.8k
}
320
321
} // namespace segment_v2
322
} // namespace doris