Coverage Report

Created: 2026-04-14 15:19

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/page_io.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/page_io.h"
19
20
#include <crc32c/crc32c.h>
21
#include <gen_cpp/segment_v2.pb.h>
22
#include <stdint.h>
23
24
#include <algorithm>
25
#include <cstring>
26
#include <memory>
27
#include <ostream>
28
#include <string>
29
#include <utility>
30
31
#include "cloud/config.h"
32
#include "common/logging.h"
33
#include "cpp/sync_point.h"
34
#include "io/cache/block_file_cache.h"
35
#include "io/cache/block_file_cache_factory.h"
36
#include "io/cache/cached_remote_file_reader.h"
37
#include "io/fs/file_reader.h"
38
#include "io/fs/file_writer.h"
39
#include "runtime/runtime_profile.h"
40
#include "storage/cache/page_cache.h"
41
#include "storage/olap_common.h"
42
#include "storage/segment/encoding_info.h"
43
#include "storage/segment/page_handle.h"
44
#include "util/block_compression.h"
45
#include "util/coding.h"
46
#include "util/concurrency_stats.h"
47
#include "util/faststring.h"
48
49
namespace doris {
50
namespace segment_v2 {
51
52
Status PageIO::compress_page_body(BlockCompressionCodec* codec, double min_space_saving,
53
2.01M
                                  const std::vector<Slice>& body, OwnedSlice* compressed_body) {
54
2.01M
    size_t uncompressed_size = Slice::compute_total_size(body);
55
2.01M
    if (codec != nullptr && !codec->exceed_max_compress_len(uncompressed_size)) {
56
335k
        faststring buf;
57
335k
        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(codec->compress(body, uncompressed_size, &buf));
58
335k
        double space_saving =
59
335k
                1.0 - (cast_set<double>(buf.size()) / cast_set<double>(uncompressed_size));
60
        // return compressed body only when it saves more than min_space_saving
61
335k
        if (space_saving > 0 && space_saving >= min_space_saving) {
62
            // shrink the buf to fit the len size to avoid taking
63
            // up the memory of the size MAX_COMPRESSED_SIZE
64
138k
            RETURN_IF_CATCH_EXCEPTION(*compressed_body = buf.build());
65
138k
            return Status::OK();
66
138k
        }
67
335k
    }
68
    // otherwise, do not compress
69
1.87M
    OwnedSlice empty;
70
1.87M
    *compressed_body = std::move(empty);
71
1.87M
    return Status::OK();
72
2.01M
}
73
74
Status PageIO::write_page(io::FileWriter* writer, const std::vector<Slice>& body,
75
2.07M
                          const PageFooterPB& footer, PagePointer* result) {
76
    // sanity check of page footer
77
18.4E
    CHECK(footer.has_type()) << "type must be set";
78
18.4E
    CHECK(footer.has_uncompressed_size()) << "uncompressed_size must be set";
79
2.07M
    switch (footer.type()) {
80
1.70M
    case DATA_PAGE:
81
1.70M
        CHECK(footer.has_data_page_footer());
82
1.70M
        break;
83
13.8k
    case INDEX_PAGE:
84
13.8k
        CHECK(footer.has_index_page_footer());
85
13.8k
        break;
86
300k
    case DICTIONARY_PAGE:
87
300k
        CHECK(footer.has_dict_page_footer());
88
300k
        break;
89
47.7k
    case SHORT_KEY_PAGE:
90
47.7k
        CHECK(footer.has_short_key_page_footer());
91
47.7k
        break;
92
0
    default:
93
0
        CHECK(false) << "Invalid page footer type: " << footer.type();
94
0
        break;
95
2.07M
    }
96
97
2.07M
    std::string footer_buf; // serialized footer + footer size
98
2.07M
    footer.SerializeToString(&footer_buf);
99
2.07M
    put_fixed32_le(&footer_buf, static_cast<uint32_t>(footer_buf.size()));
100
101
2.07M
    std::vector<Slice> page = body;
102
2.07M
    page.emplace_back(footer_buf);
103
104
    // checksum
105
2.07M
    uint8_t checksum_buf[sizeof(uint32_t)];
106
2.07M
    uint32_t checksum = 0;
107
5.07M
    for (const auto& slice : page) {
108
5.07M
        checksum = crc32c::Extend(checksum, (const uint8_t*)slice.data, slice.size);
109
5.07M
    }
110
2.07M
    encode_fixed32_le(checksum_buf, checksum);
111
2.07M
    page.emplace_back(checksum_buf, sizeof(uint32_t));
112
113
2.07M
    uint64_t offset = writer->bytes_appended();
114
2.07M
    RETURN_IF_ERROR(writer->appendv(&page[0], page.size()));
115
116
2.07M
    result->offset = offset;
117
2.07M
    result->size = cast_set<uint32_t>(writer->bytes_appended() - offset);
118
2.07M
    return Status::OK();
119
2.07M
}
120
121
5
io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) {
122
5
    std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky: npos + 1 == 0
123
5
    return io::BlockFileCache::hash(base);
124
5
}
125
126
3
std::string file_cache_key_str(const std::string& seg_path) {
127
3
    return file_cache_key_from_path(seg_path).to_string();
128
3
}
129
130
Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
131
6.57M
                                         Slice* body, PageFooterPB* footer) {
132
6.57M
    opts.sanity_check();
133
6.57M
    opts.stats->total_pages_num++;
134
135
6.57M
    auto cache = StoragePageCache::instance();
136
6.57M
    PageCacheHandle cache_handle;
137
6.57M
    StoragePageCache::CacheKey cache_key(opts.file_reader->path().native(),
138
6.57M
                                         opts.file_reader->size(), opts.page_pointer.offset);
139
18.4E
    VLOG_DEBUG << fmt::format("Reading page {}:{}:{}", cache_key.fname, cache_key.fsize,
140
18.4E
                              cache_key.offset);
141
6.57M
    if (opts.use_page_cache && cache && cache->lookup(cache_key, &cache_handle, opts.type)) {
142
        // we find page in cache, use it
143
5.19M
        *handle = PageHandle(std::move(cache_handle));
144
5.19M
        opts.stats->cached_pages_num++;
145
        // parse body and footer
146
5.19M
        Slice page_slice = handle->data();
147
5.19M
        uint32_t footer_size = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
148
5.19M
        std::string footer_buf(page_slice.data + page_slice.size - 4 - footer_size, footer_size);
149
5.19M
        if (!footer->ParseFromString(footer_buf)) {
150
0
            return Status::Corruption("Bad page: invalid footer, footer_size={}, file={}",
151
0
                                      footer_size, opts.file_reader->path().native());
152
0
        }
153
5.19M
        *body = Slice(page_slice.data, page_slice.size - 4 - footer_size);
154
        // If read from cache, then should also recorded in uncompressed bytes read counter.
155
5.19M
        opts.stats->uncompressed_bytes_read += body->size;
156
5.19M
        return Status::OK();
157
5.19M
    }
158
159
    // every page contains 4 bytes footer length and 4 bytes checksum
160
1.38M
    const uint32_t page_size = opts.page_pointer.size;
161
1.38M
    if (page_size < 8) {
162
0
        return Status::Corruption("Bad page: too small size ({}), file={}", page_size,
163
0
                                  opts.file_reader->path().native());
164
0
    }
165
166
    // hold compressed page at first, reset to decompressed page later
167
1.38M
    std::unique_ptr<DataPage> page =
168
1.38M
            std::make_unique<DataPage>(page_size, opts.use_page_cache, opts.type);
169
1.38M
    Slice page_slice(page->data(), page_size);
170
1.38M
    {
171
1.38M
        SCOPED_RAW_TIMER(&opts.stats->io_ns);
172
1.38M
        size_t bytes_read = 0;
173
1.38M
        RETURN_IF_ERROR(opts.file_reader->read_at(opts.page_pointer.offset, page_slice, &bytes_read,
174
1.38M
                                                  &opts.io_ctx));
175
1.38M
        DCHECK_EQ(bytes_read, page_size);
176
1.38M
        opts.stats->compressed_bytes_read += page_size;
177
1.38M
    }
178
179
1.38M
    if (opts.verify_checksum) {
180
1.36M
        uint32_t expect = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
181
1.36M
        uint32_t actual = crc32c::Crc32c(page_slice.data, page_slice.size - 4);
182
        // here const_cast is used for testing.
183
1.36M
        InjectionContext ctx = {&actual, const_cast<PageReadOptions*>(&opts)};
184
1.36M
        (void)ctx;
185
1.36M
        TEST_INJECTION_POINT_CALLBACK("PageIO::read_and_decompress_page:crc_failure_inj", &ctx);
186
1.36M
        if (expect != actual) {
187
0
            return Status::Corruption(
188
0
                    "Bad page: checksum mismatch (actual={} vs expect={}), file={}", actual, expect,
189
0
                    opts.file_reader->path().native());
190
0
        }
191
1.36M
    }
192
193
    // remove checksum suffix
194
1.38M
    page_slice.size -= 4;
195
    // parse and set footer
196
1.38M
    uint32_t footer_size = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
197
1.38M
    if (!footer->ParseFromArray(page_slice.data + page_slice.size - 4 - footer_size, footer_size)) {
198
0
        return Status::Corruption("Bad page: invalid footer, footer_size={}, file={}", footer_size,
199
0
                                  opts.file_reader->path().native());
200
0
    }
201
202
1.38M
    auto body_size = cast_set<uint32_t>(page_slice.size - 4 - footer_size);
203
1.38M
    if (body_size != footer->uncompressed_size()) { // need decompress body
204
108k
        if (opts.codec == nullptr) {
205
0
            return Status::Corruption(
206
0
                    "Bad page: page is compressed but codec is NO_COMPRESSION, file={}",
207
0
                    opts.file_reader->path().native());
208
0
        }
209
108k
        SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_decompress);
210
108k
        SCOPED_RAW_TIMER(&opts.stats->decompress_ns);
211
108k
        std::unique_ptr<DataPage> decompressed_page = std::make_unique<DataPage>(
212
108k
                footer->uncompressed_size() + footer_size + 4, opts.use_page_cache, opts.type);
213
214
        // decompress page body
215
108k
        Slice compressed_body(page_slice.data, body_size);
216
108k
        Slice decompressed_body(decompressed_page->data(), footer->uncompressed_size());
217
108k
        RETURN_IF_ERROR(opts.codec->decompress(compressed_body, &decompressed_body));
218
108k
        if (decompressed_body.size != footer->uncompressed_size()) {
219
0
            return Status::Corruption(
220
0
                    "Bad page: record uncompressed size={} vs real decompressed size={}, file={}",
221
0
                    footer->uncompressed_size(), decompressed_body.size,
222
0
                    opts.file_reader->path().native());
223
0
        }
224
        // append footer and footer size
225
108k
        memcpy(decompressed_body.data + decompressed_body.size, page_slice.data + body_size,
226
108k
               footer_size + 4);
227
        // free memory of compressed page
228
108k
        page = std::move(decompressed_page);
229
108k
        page_slice = Slice(page->data(), footer->uncompressed_size() + footer_size + 4);
230
108k
    }
231
232
1.38M
    if (opts.pre_decode) {
233
1.36M
        const auto* encoding_info = opts.encoding_info;
234
1.36M
        if (opts.is_dict_page) {
235
            // for dict page, we need to use encoding_info based on footer->dict_page_footer().encoding()
236
            // to get its pre_decoder
237
281k
            RETURN_IF_ERROR(EncodingInfo::get(FieldType::OLAP_FIELD_TYPE_VARCHAR,
238
281k
                                              footer->dict_page_footer().encoding(), {},
239
281k
                                              &encoding_info));
240
281k
        }
241
1.36M
        if (encoding_info) {
242
1.30M
            auto* pre_decoder = encoding_info->get_data_page_pre_decoder();
243
1.30M
            if (pre_decoder) {
244
877k
                SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_pre_decode);
245
877k
                RETURN_IF_ERROR(pre_decoder->decode(
246
877k
                        &page, &page_slice,
247
877k
                        footer->data_page_footer().nullmap_size() + footer_size + 4,
248
877k
                        opts.use_page_cache, opts.type, opts.file_reader->path().native()));
249
877k
            }
250
1.30M
        }
251
1.36M
    }
252
253
1.38M
    *body = Slice(page_slice.data, page_slice.size - 4 - footer_size);
254
1.38M
    page->reset_size(page_slice.size);
255
    // Uncompressed has 2 meanings: uncompress and decode. The buffer in pagecache maybe
256
    // uncompressed or decoded. So that should update the uncompressed_bytes_read counter
257
    // just before add it to pagecache, it will be consistency with reading data from page cache.
258
1.38M
    opts.stats->uncompressed_bytes_read += body->size;
259
1.38M
    if (opts.use_page_cache && cache) {
260
428k
        SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_insert_page_cache);
261
        // insert this page into cache and return the cache handle
262
428k
        cache->insert(cache_key, page.get(), &cache_handle, opts.type, opts.kept_in_memory);
263
428k
        *handle = PageHandle(std::move(cache_handle));
264
954k
    } else {
265
954k
        *handle = PageHandle(page.get());
266
954k
    }
267
1.38M
    page.release(); // memory now managed by handle
268
1.38M
    return Status::OK();
269
1.38M
}
270
271
Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle,
272
6.56M
                                        Slice* body, PageFooterPB* footer) {
273
    // First try to read with file cache
274
6.56M
    Status st = do_read_and_decompress_page(opts, handle, body, footer);
275
6.56M
    if (!st.is<ErrorCode::CORRUPTION>() || !config::is_cloud_mode()) {
276
6.55M
        return st;
277
6.55M
    }
278
279
11.5k
    auto* cached_file_reader = dynamic_cast<io::CachedRemoteFileReader*>(opts.file_reader);
280
11.5k
    if (cached_file_reader == nullptr) {
281
0
        return st;
282
0
    }
283
284
    // If we get CORRUPTION error and using file cache, clear cache and retry
285
11.5k
    LOG(WARNING) << "Bad page may be read from file cache, need retry."
286
11.5k
                 << " error msg: " << st.msg()
287
11.5k
                 << " file path: " << opts.file_reader->path().native()
288
11.5k
                 << " offset: " << opts.page_pointer.offset;
289
290
    // Remove cache if exists
291
11.5k
    const std::string path = opts.file_reader->path().string();
292
11.5k
    auto file_key = file_cache_key_from_path(path);
293
11.5k
    auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
294
11.5k
    if (file_cache) {
295
0
        file_cache->remove_if_cached(file_key);
296
0
    }
297
298
    // Retry with file cache
299
11.5k
    st = do_read_and_decompress_page(opts, handle, body, footer);
300
11.5k
    if (!st.is<ErrorCode::CORRUPTION>()) {
301
0
        return st;
302
0
    }
303
304
11.5k
    LOG(WARNING) << "Corruption again with retry downloading cache,"
305
11.5k
                 << " error msg: " << st.msg()
306
11.5k
                 << " file path: " << opts.file_reader->path().native()
307
11.5k
                 << " offset: " << opts.page_pointer.offset;
308
309
11.5k
    PageReadOptions new_opts = opts;
310
11.5k
    new_opts.file_reader = cached_file_reader->get_remote_reader();
311
11.5k
    st = do_read_and_decompress_page(new_opts, handle, body, footer);
312
11.5k
    if (!st.ok()) {
313
        LOG(WARNING) << "Corruption again with retry read directly from remote,"
314
0
                     << " error msg: " << st.msg()
315
0
                     << " file path: " << opts.file_reader->path().native()
316
0
                     << " offset: " << opts.page_pointer.offset << " Give up.";
317
0
    }
318
11.5k
    return st;
319
11.5k
}
320
321
} // namespace segment_v2
322
} // namespace doris