Coverage Report

Created: 2026-03-16 21:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/page_io.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/page_io.h"
19
20
#include <crc32c/crc32c.h>
21
#include <gen_cpp/segment_v2.pb.h>
22
#include <stdint.h>
23
24
#include <algorithm>
25
#include <cstring>
26
#include <memory>
27
#include <ostream>
28
#include <string>
29
#include <utility>
30
31
#include "cloud/config.h"
32
#include "common/logging.h"
33
#include "cpp/sync_point.h"
34
#include "io/cache/block_file_cache.h"
35
#include "io/cache/block_file_cache_factory.h"
36
#include "io/cache/cached_remote_file_reader.h"
37
#include "io/fs/file_reader.h"
38
#include "io/fs/file_writer.h"
39
#include "runtime/runtime_profile.h"
40
#include "storage/cache/page_cache.h"
41
#include "storage/olap_common.h"
42
#include "storage/segment/encoding_info.h"
43
#include "storage/segment/page_handle.h"
44
#include "util/block_compression.h"
45
#include "util/coding.h"
46
#include "util/concurrency_stats.h"
47
#include "util/faststring.h"
48
49
namespace doris {
50
namespace segment_v2 {
51
#include "common/compile_check_begin.h"
52
53
Status PageIO::compress_page_body(BlockCompressionCodec* codec, double min_space_saving,
54
41.9k
                                  const std::vector<Slice>& body, OwnedSlice* compressed_body) {
55
41.9k
    size_t uncompressed_size = Slice::compute_total_size(body);
56
41.9k
    if (codec != nullptr && !codec->exceed_max_compress_len(uncompressed_size)) {
57
26.0k
        faststring buf;
58
26.0k
        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(codec->compress(body, uncompressed_size, &buf));
59
26.0k
        double space_saving =
60
26.0k
                1.0 - (cast_set<double>(buf.size()) / cast_set<double>(uncompressed_size));
61
        // return compressed body only when it saves more than min_space_saving
62
26.0k
        if (space_saving > 0 && space_saving >= min_space_saving) {
63
            // shrink the buf to fit the len size to avoid taking
64
            // up the memory of the size MAX_COMPRESSED_SIZE
65
10.8k
            RETURN_IF_CATCH_EXCEPTION(*compressed_body = buf.build());
66
10.8k
            return Status::OK();
67
10.8k
        }
68
26.0k
    }
69
    // otherwise, do not compress
70
31.1k
    OwnedSlice empty;
71
31.1k
    *compressed_body = std::move(empty);
72
31.1k
    return Status::OK();
73
41.9k
}
74
75
Status PageIO::write_page(io::FileWriter* writer, const std::vector<Slice>& body,
76
48.8k
                          const PageFooterPB& footer, PagePointer* result) {
77
    // sanity check of page footer
78
48.8k
    CHECK(footer.has_type()) << "type must be set";
79
48.8k
    CHECK(footer.has_uncompressed_size()) << "uncompressed_size must be set";
80
48.8k
    switch (footer.type()) {
81
38.1k
    case DATA_PAGE:
82
38.1k
        CHECK(footer.has_data_page_footer());
83
38.1k
        break;
84
1.67k
    case INDEX_PAGE:
85
1.67k
        CHECK(footer.has_index_page_footer());
86
1.67k
        break;
87
3.81k
    case DICTIONARY_PAGE:
88
3.81k
        CHECK(footer.has_dict_page_footer());
89
3.81k
        break;
90
5.19k
    case SHORT_KEY_PAGE:
91
5.19k
        CHECK(footer.has_short_key_page_footer());
92
5.19k
        break;
93
0
    default:
94
0
        CHECK(false) << "Invalid page footer type: " << footer.type();
95
0
        break;
96
48.8k
    }
97
98
48.8k
    std::string footer_buf; // serialized footer + footer size
99
48.8k
    footer.SerializeToString(&footer_buf);
100
48.8k
    put_fixed32_le(&footer_buf, static_cast<uint32_t>(footer_buf.size()));
101
102
48.8k
    std::vector<Slice> page = body;
103
48.8k
    page.emplace_back(footer_buf);
104
105
    // checksum
106
48.8k
    uint8_t checksum_buf[sizeof(uint32_t)];
107
48.8k
    uint32_t checksum = 0;
108
116k
    for (const auto& slice : page) {
109
116k
        checksum = crc32c::Extend(checksum, (const uint8_t*)slice.data, slice.size);
110
116k
    }
111
48.8k
    encode_fixed32_le(checksum_buf, checksum);
112
48.8k
    page.emplace_back(checksum_buf, sizeof(uint32_t));
113
114
48.8k
    uint64_t offset = writer->bytes_appended();
115
48.8k
    RETURN_IF_ERROR(writer->appendv(&page[0], page.size()));
116
117
48.8k
    result->offset = offset;
118
48.8k
    result->size = cast_set<uint32_t>(writer->bytes_appended() - offset);
119
48.8k
    return Status::OK();
120
48.8k
}
121
122
5
io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) {
123
5
    std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky: npos + 1 == 0
124
5
    return io::BlockFileCache::hash(base);
125
5
}
126
127
3
std::string file_cache_key_str(const std::string& seg_path) {
128
3
    return file_cache_key_from_path(seg_path).to_string();
129
3
}
130
131
Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
132
26.0k
                                         Slice* body, PageFooterPB* footer) {
133
26.0k
    opts.sanity_check();
134
26.0k
    opts.stats->total_pages_num++;
135
136
26.0k
    auto cache = StoragePageCache::instance();
137
26.0k
    PageCacheHandle cache_handle;
138
26.0k
    StoragePageCache::CacheKey cache_key(opts.file_reader->path().native(),
139
26.0k
                                         opts.file_reader->size(), opts.page_pointer.offset);
140
26.0k
    VLOG_DEBUG << fmt::format("Reading page {}:{}:{}", cache_key.fname, cache_key.fsize,
141
0
                              cache_key.offset);
142
26.0k
    if (opts.use_page_cache && cache && cache->lookup(cache_key, &cache_handle, opts.type)) {
143
        // we find page in cache, use it
144
841
        *handle = PageHandle(std::move(cache_handle));
145
841
        opts.stats->cached_pages_num++;
146
        // parse body and footer
147
841
        Slice page_slice = handle->data();
148
841
        uint32_t footer_size = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
149
841
        std::string footer_buf(page_slice.data + page_slice.size - 4 - footer_size, footer_size);
150
841
        if (!footer->ParseFromString(footer_buf)) {
151
0
            return Status::Corruption("Bad page: invalid footer, footer_size={}, file={}",
152
0
                                      footer_size, opts.file_reader->path().native());
153
0
        }
154
841
        *body = Slice(page_slice.data, page_slice.size - 4 - footer_size);
155
        // If read from cache, then should also recorded in uncompressed bytes read counter.
156
841
        opts.stats->uncompressed_bytes_read += body->size;
157
841
        return Status::OK();
158
841
    }
159
160
    // every page contains 4 bytes footer length and 4 bytes checksum
161
25.2k
    const uint32_t page_size = opts.page_pointer.size;
162
25.2k
    if (page_size < 8) {
163
0
        return Status::Corruption("Bad page: too small size ({}), file={}", page_size,
164
0
                                  opts.file_reader->path().native());
165
0
    }
166
167
    // hold compressed page at first, reset to decompressed page later
168
25.2k
    std::unique_ptr<DataPage> page =
169
25.2k
            std::make_unique<DataPage>(page_size, opts.use_page_cache, opts.type);
170
25.2k
    Slice page_slice(page->data(), page_size);
171
25.2k
    {
172
25.2k
        SCOPED_RAW_TIMER(&opts.stats->io_ns);
173
25.2k
        size_t bytes_read = 0;
174
25.2k
        RETURN_IF_ERROR(opts.file_reader->read_at(opts.page_pointer.offset, page_slice, &bytes_read,
175
25.2k
                                                  &opts.io_ctx));
176
25.2k
        DCHECK_EQ(bytes_read, page_size);
177
25.2k
        opts.stats->compressed_bytes_read += page_size;
178
25.2k
    }
179
180
25.2k
    if (opts.verify_checksum) {
181
25.2k
        uint32_t expect = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
182
25.2k
        uint32_t actual = crc32c::Crc32c(page_slice.data, page_slice.size - 4);
183
        // here const_cast is used for testing.
184
25.2k
        InjectionContext ctx = {&actual, const_cast<PageReadOptions*>(&opts)};
185
25.2k
        (void)ctx;
186
25.2k
        TEST_INJECTION_POINT_CALLBACK("PageIO::read_and_decompress_page:crc_failure_inj", &ctx);
187
25.2k
        if (expect != actual) {
188
0
            return Status::Corruption(
189
0
                    "Bad page: checksum mismatch (actual={} vs expect={}), file={}", actual, expect,
190
0
                    opts.file_reader->path().native());
191
0
        }
192
25.2k
    }
193
194
    // remove checksum suffix
195
25.2k
    page_slice.size -= 4;
196
    // parse and set footer
197
25.2k
    uint32_t footer_size = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4);
198
25.2k
    if (!footer->ParseFromArray(page_slice.data + page_slice.size - 4 - footer_size, footer_size)) {
199
0
        return Status::Corruption("Bad page: invalid footer, footer_size={}, file={}", footer_size,
200
0
                                  opts.file_reader->path().native());
201
0
    }
202
203
25.2k
    auto body_size = cast_set<uint32_t>(page_slice.size - 4 - footer_size);
204
25.2k
    if (body_size != footer->uncompressed_size()) { // need decompress body
205
8.72k
        if (opts.codec == nullptr) {
206
0
            return Status::Corruption(
207
0
                    "Bad page: page is compressed but codec is NO_COMPRESSION, file={}",
208
0
                    opts.file_reader->path().native());
209
0
        }
210
8.72k
        SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_decompress);
211
8.72k
        SCOPED_RAW_TIMER(&opts.stats->decompress_ns);
212
8.72k
        std::unique_ptr<DataPage> decompressed_page = std::make_unique<DataPage>(
213
8.72k
                footer->uncompressed_size() + footer_size + 4, opts.use_page_cache, opts.type);
214
215
        // decompress page body
216
8.72k
        Slice compressed_body(page_slice.data, body_size);
217
8.72k
        Slice decompressed_body(decompressed_page->data(), footer->uncompressed_size());
218
8.72k
        RETURN_IF_ERROR(opts.codec->decompress(compressed_body, &decompressed_body));
219
8.72k
        if (decompressed_body.size != footer->uncompressed_size()) {
220
0
            return Status::Corruption(
221
0
                    "Bad page: record uncompressed size={} vs real decompressed size={}, file={}",
222
0
                    footer->uncompressed_size(), decompressed_body.size,
223
0
                    opts.file_reader->path().native());
224
0
        }
225
        // append footer and footer size
226
8.72k
        memcpy(decompressed_body.data + decompressed_body.size, page_slice.data + body_size,
227
8.72k
               footer_size + 4);
228
        // free memory of compressed page
229
8.72k
        page = std::move(decompressed_page);
230
8.72k
        page_slice = Slice(page->data(), footer->uncompressed_size() + footer_size + 4);
231
8.72k
    }
232
233
25.2k
    if (opts.pre_decode) {
234
25.1k
        const auto* encoding_info = opts.encoding_info;
235
25.1k
        if (opts.is_dict_page) {
236
            // for dict page, we need to use encoding_info based on footer->dict_page_footer().encoding()
237
            // to get its pre_decoder
238
2.75k
            RETURN_IF_ERROR(EncodingInfo::get(FieldType::OLAP_FIELD_TYPE_VARCHAR,
239
2.75k
                                              footer->dict_page_footer().encoding(), {},
240
2.75k
                                              &encoding_info));
241
2.75k
        }
242
25.1k
        if (encoding_info) {
243
18.7k
            auto* pre_decoder = encoding_info->get_data_page_pre_decoder();
244
18.7k
            if (pre_decoder) {
245
15.7k
                SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_pre_decode);
246
15.7k
                RETURN_IF_ERROR(pre_decoder->decode(
247
15.7k
                        &page, &page_slice,
248
15.7k
                        footer->data_page_footer().nullmap_size() + footer_size + 4,
249
15.7k
                        opts.use_page_cache, opts.type, opts.file_reader->path().native()));
250
15.7k
            }
251
18.7k
        }
252
25.1k
    }
253
254
25.2k
    *body = Slice(page_slice.data, page_slice.size - 4 - footer_size);
255
25.2k
    page->reset_size(page_slice.size);
256
    // Uncompressed has 2 meanings: uncompress and decode. The buffer in pagecache maybe
257
    // uncompressed or decoded. So that should update the uncompressed_bytes_read counter
258
    // just before add it to pagecache, it will be consistency with reading data from page cache.
259
25.2k
    opts.stats->uncompressed_bytes_read += body->size;
260
25.2k
    if (opts.use_page_cache && cache) {
261
5.03k
        SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().page_io_insert_page_cache);
262
        // insert this page into cache and return the cache handle
263
5.03k
        cache->insert(cache_key, page.get(), &cache_handle, opts.type, opts.kept_in_memory);
264
5.03k
        *handle = PageHandle(std::move(cache_handle));
265
20.2k
    } else {
266
20.2k
        *handle = PageHandle(page.get());
267
20.2k
    }
268
25.2k
    page.release(); // memory now managed by handle
269
25.2k
    return Status::OK();
270
25.2k
}
271
272
Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle,
273
26.0k
                                        Slice* body, PageFooterPB* footer) {
274
    // First try to read with file cache
275
26.0k
    Status st = do_read_and_decompress_page(opts, handle, body, footer);
276
26.0k
    if (!st.is<ErrorCode::CORRUPTION>() || !config::is_cloud_mode()) {
277
26.0k
        return st;
278
26.0k
    }
279
280
0
    auto* cached_file_reader = dynamic_cast<io::CachedRemoteFileReader*>(opts.file_reader);
281
0
    if (cached_file_reader == nullptr) {
282
0
        return st;
283
0
    }
284
285
    // If we get CORRUPTION error and using file cache, clear cache and retry
286
0
    LOG(WARNING) << "Bad page may be read from file cache, need retry."
287
0
                 << " error msg: " << st.msg()
288
0
                 << " file path: " << opts.file_reader->path().native()
289
0
                 << " offset: " << opts.page_pointer.offset;
290
291
    // Remove cache if exists
292
0
    const std::string path = opts.file_reader->path().string();
293
0
    auto file_key = file_cache_key_from_path(path);
294
0
    auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
295
0
    if (file_cache) {
296
0
        file_cache->remove_if_cached(file_key);
297
0
    }
298
299
    // Retry with file cache
300
0
    st = do_read_and_decompress_page(opts, handle, body, footer);
301
0
    if (!st.is<ErrorCode::CORRUPTION>()) {
302
0
        return st;
303
0
    }
304
305
0
    LOG(WARNING) << "Corruption again with retry downloading cache,"
306
0
                 << " error msg: " << st.msg()
307
0
                 << " file path: " << opts.file_reader->path().native()
308
0
                 << " offset: " << opts.page_pointer.offset;
309
310
0
    PageReadOptions new_opts = opts;
311
0
    new_opts.file_reader = cached_file_reader->get_remote_reader();
312
0
    st = do_read_and_decompress_page(new_opts, handle, body, footer);
313
0
    if (!st.ok()) {
314
        LOG(WARNING) << "Corruption again with retry read directly from remote,"
315
0
                     << " error msg: " << st.msg()
316
0
                     << " file path: " << opts.file_reader->path().native()
317
0
                     << " offset: " << opts.page_pointer.offset << " Give up.";
318
0
    }
319
0
    return st;
320
0
}
321
322
#include "common/compile_check_end.h"
323
} // namespace segment_v2
324
} // namespace doris