Coverage Report

Created: 2026-06-30 22:35

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/vparquet_page_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/parquet/vparquet_page_reader.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/parquet_types.h>
22
#include <stddef.h>
23
#include <stdint.h>
24
25
#include <algorithm>
26
27
#include "common/compiler_util.h" // IWYU pragma: keep
28
#include "common/config.h"
29
#include "format/parquet/parquet_common.h"
30
#include "io/fs/buffered_reader.h"
31
#include "runtime/runtime_profile.h"
32
#include "storage/cache/page_cache.h"
33
#include "util/slice.h"
34
#include "util/thrift_util.h"
35
36
namespace doris {
37
namespace io {
38
struct IOContext;
39
} // namespace io
40
} // namespace doris
41
42
namespace doris {
43
static constexpr size_t INIT_PAGE_HEADER_SIZE = 128;
44
45
164
void ParquetPageCacheKeyBuilder::init(const std::string& path, int64_t mtime) {
46
164
    _file_key_prefix = fmt::format("{}::{}", path, mtime);
47
164
}
48
49
template <bool IN_COLLECTION, bool OFFSET_INDEX>
50
PageReader<IN_COLLECTION, OFFSET_INDEX>::PageReader(io::BufferedStreamReader* reader,
51
                                                    io::IOContext* io_ctx, uint64_t offset,
52
                                                    uint64_t length, size_t total_rows,
53
                                                    const tparquet::ColumnMetaData& metadata,
54
                                                    const ParquetPageReadContext& page_read_ctx,
55
                                                    const tparquet::OffsetIndex* offset_index)
56
164
        : _reader(reader),
57
164
          _io_ctx(io_ctx),
58
164
          _offset(offset),
59
164
          _start_offset(offset),
60
164
          _end_offset(offset + length),
61
164
          _total_rows(total_rows),
62
164
          _metadata(metadata),
63
164
          _page_read_ctx(page_read_ctx),
64
164
          _offset_index(offset_index) {
65
164
    _next_header_offset = _offset;
66
164
    _state = INITIALIZED;
67
164
    _page_cache_key_builder.init(_reader->path(), _reader->mtime());
68
69
164
    if constexpr (OFFSET_INDEX) {
70
0
        _end_row = _offset_index->page_locations.size() >= 2
71
0
                           ? _offset_index->page_locations[1].first_row_index
72
0
                           : _total_rows;
73
0
    }
74
164
}
Unexecuted instantiation: _ZN5doris10PageReaderILb1ELb1EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE
_ZN5doris10PageReaderILb1ELb0EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE
Line
Count
Source
56
3
        : _reader(reader),
57
3
          _io_ctx(io_ctx),
58
3
          _offset(offset),
59
3
          _start_offset(offset),
60
3
          _end_offset(offset + length),
61
3
          _total_rows(total_rows),
62
3
          _metadata(metadata),
63
3
          _page_read_ctx(page_read_ctx),
64
3
          _offset_index(offset_index) {
65
3
    _next_header_offset = _offset;
66
3
    _state = INITIALIZED;
67
3
    _page_cache_key_builder.init(_reader->path(), _reader->mtime());
68
69
    if constexpr (OFFSET_INDEX) {
70
        _end_row = _offset_index->page_locations.size() >= 2
71
                           ? _offset_index->page_locations[1].first_row_index
72
                           : _total_rows;
73
    }
74
3
}
Unexecuted instantiation: _ZN5doris10PageReaderILb0ELb1EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE
_ZN5doris10PageReaderILb0ELb0EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE
Line
Count
Source
56
161
        : _reader(reader),
57
161
          _io_ctx(io_ctx),
58
161
          _offset(offset),
59
161
          _start_offset(offset),
60
161
          _end_offset(offset + length),
61
161
          _total_rows(total_rows),
62
161
          _metadata(metadata),
63
161
          _page_read_ctx(page_read_ctx),
64
161
          _offset_index(offset_index) {
65
161
    _next_header_offset = _offset;
66
161
    _state = INITIALIZED;
67
161
    _page_cache_key_builder.init(_reader->path(), _reader->mtime());
68
69
    if constexpr (OFFSET_INDEX) {
70
        _end_row = _offset_index->page_locations.size() >= 2
71
                           ? _offset_index->page_locations[1].first_row_index
72
                           : _total_rows;
73
    }
74
161
}
75
76
template <bool IN_COLLECTION, bool OFFSET_INDEX>
77
190
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() {
78
190
    if (_state == HEADER_PARSED) {
79
0
        return Status::OK();
80
0
    }
81
190
    if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) {
82
0
        return Status::IOError("Out-of-bounds Access");
83
0
    }
84
190
    if (UNLIKELY(_offset != _next_header_offset)) {
85
0
        return Status::IOError("Wrong header position, should seek to a page header first");
86
0
    }
87
190
    if (UNLIKELY(_state != INITIALIZED)) {
88
0
        return Status::IOError("Should skip or load current page to get next page");
89
0
    }
90
91
190
    _page_statistics.page_read_counter += 1;
92
93
    // Parse page header from file; header bytes are saved for possible cache insertion
94
190
    const uint8_t* page_header_buf = nullptr;
95
190
    size_t max_size = _end_offset - _offset;
96
190
    size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size);
97
190
    const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20;
98
190
    uint32_t real_header_size = 0;
99
100
    // Try a header-only lookup in the page cache. Cached pages store
101
    // header + optional v2 levels + uncompressed payload, so we can
102
    // parse the page header directly from the cached bytes and avoid
103
    // a file read for the header.
104
190
    if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache &&
105
190
        StoragePageCache::instance() != nullptr) {
106
190
        PageCacheHandle handle;
107
190
        StoragePageCache::CacheKey key = make_page_cache_key(static_cast<int64_t>(_offset));
108
190
        if (StoragePageCache::instance()->lookup(key, &handle, segment_v2::DATA_PAGE)) {
109
            // Parse header directly from cached data
110
110
            _page_cache_handle = std::move(handle);
111
110
            Slice s = _page_cache_handle.data();
112
110
            real_header_size = cast_set<uint32_t>(s.size);
113
110
            SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
114
110
            auto st = deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(s.data),
115
110
                                             &real_header_size, true, &_cur_page_header);
116
110
            if (!st.ok()) return st;
117
            // Increment page cache counters for a true cache hit on header+payload
118
110
            _page_statistics.page_cache_hit_counter += 1;
119
            // Detect whether the cached payload is compressed or decompressed and record
120
110
            bool is_cache_payload_decompressed =
121
110
                    should_cache_decompressed(&_cur_page_header, _metadata);
122
123
110
            if (is_cache_payload_decompressed) {
124
110
                _page_statistics.page_cache_decompressed_hit_counter += 1;
125
110
            } else {
126
0
                _page_statistics.page_cache_compressed_hit_counter += 1;
127
0
            }
128
129
110
            _is_cache_payload_decompressed = is_cache_payload_decompressed;
130
131
110
            if constexpr (OFFSET_INDEX == false) {
132
110
                if (is_header_v2()) {
133
8
                    _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
134
101
                } else if constexpr (!IN_COLLECTION) {
135
101
                    _end_row = _start_row + _cur_page_header.data_page_header.num_values;
136
101
                }
137
110
            }
138
139
            // Save header bytes for later use (e.g., to insert updated cache entries)
140
110
            _header_buf.assign(s.data, s.data + real_header_size);
141
110
            _last_header_size = real_header_size;
142
110
            _page_statistics.parse_page_header_num++;
143
110
            _offset += real_header_size;
144
110
            _next_header_offset = _offset + _cur_page_header.compressed_page_size;
145
110
            _state = HEADER_PARSED;
146
110
            return Status::OK();
147
110
        } else {
148
80
            _page_statistics.page_cache_missing_counter += 1;
149
            // Clear any existing cache handle on miss to avoid holding stale handle
150
80
            _page_cache_handle = PageCacheHandle();
151
80
        }
152
190
    }
153
    // NOTE: page cache lookup for *decompressed* page data is handled in
154
    // ColumnChunkReader::load_page_data(). PageReader should only be
155
    // responsible for parsing the header bytes from the file and saving
156
    // them in `_header_buf` for possible later insertion into the cache.
157
81
    while (true) {
158
81
        if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
159
0
            return Status::EndOfFile("stop");
160
0
        }
161
81
        header_size = std::min(header_size, max_size);
162
81
        {
163
81
            SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time);
164
81
            RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx));
165
81
        }
166
81
        real_header_size = cast_set<uint32_t>(header_size);
167
81
        SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
168
81
        auto st =
169
81
                deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header);
170
81
        if (st.ok()) {
171
80
            break;
172
80
        }
173
1
        if (_offset + header_size >= _end_offset || real_header_size > MAX_PAGE_HEADER_SIZE) {
174
0
            return Status::IOError(
175
0
                    "Failed to deserialize parquet page header. offset: {}, "
176
0
                    "header size: {}, end offset: {}, real header size: {}",
177
0
                    _offset, header_size, _end_offset, real_header_size);
178
0
        }
179
1
        header_size <<= 2;
180
1
    }
181
182
80
    if constexpr (OFFSET_INDEX == false) {
183
80
        if (is_header_v2()) {
184
4
            _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
185
74
        } else if constexpr (!IN_COLLECTION) {
186
74
            _end_row = _start_row + _cur_page_header.data_page_header.num_values;
187
74
        }
188
80
    }
189
190
    // Save header bytes for possible cache insertion later
191
80
    _header_buf.assign(page_header_buf, page_header_buf + real_header_size);
192
80
    _last_header_size = real_header_size;
193
80
    _page_statistics.parse_page_header_num++;
194
80
    _offset += real_header_size;
195
80
    _next_header_offset = _offset + _cur_page_header.compressed_page_size;
196
80
    _state = HEADER_PARSED;
197
80
    return Status::OK();
198
80
}
Unexecuted instantiation: _ZN5doris10PageReaderILb1ELb1EE17parse_page_headerEv
_ZN5doris10PageReaderILb1ELb0EE17parse_page_headerEv
Line
Count
Source
77
3
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() {
78
3
    if (_state == HEADER_PARSED) {
79
0
        return Status::OK();
80
0
    }
81
3
    if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) {
82
0
        return Status::IOError("Out-of-bounds Access");
83
0
    }
84
3
    if (UNLIKELY(_offset != _next_header_offset)) {
85
0
        return Status::IOError("Wrong header position, should seek to a page header first");
86
0
    }
87
3
    if (UNLIKELY(_state != INITIALIZED)) {
88
0
        return Status::IOError("Should skip or load current page to get next page");
89
0
    }
90
91
3
    _page_statistics.page_read_counter += 1;
92
93
    // Parse page header from file; header bytes are saved for possible cache insertion
94
3
    const uint8_t* page_header_buf = nullptr;
95
3
    size_t max_size = _end_offset - _offset;
96
3
    size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size);
97
3
    const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20;
98
3
    uint32_t real_header_size = 0;
99
100
    // Try a header-only lookup in the page cache. Cached pages store
101
    // header + optional v2 levels + uncompressed payload, so we can
102
    // parse the page header directly from the cached bytes and avoid
103
    // a file read for the header.
104
3
    if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache &&
105
3
        StoragePageCache::instance() != nullptr) {
106
3
        PageCacheHandle handle;
107
3
        StoragePageCache::CacheKey key = make_page_cache_key(static_cast<int64_t>(_offset));
108
3
        if (StoragePageCache::instance()->lookup(key, &handle, segment_v2::DATA_PAGE)) {
109
            // Parse header directly from cached data
110
1
            _page_cache_handle = std::move(handle);
111
1
            Slice s = _page_cache_handle.data();
112
1
            real_header_size = cast_set<uint32_t>(s.size);
113
1
            SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
114
1
            auto st = deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(s.data),
115
1
                                             &real_header_size, true, &_cur_page_header);
116
1
            if (!st.ok()) return st;
117
            // Increment page cache counters for a true cache hit on header+payload
118
1
            _page_statistics.page_cache_hit_counter += 1;
119
            // Detect whether the cached payload is compressed or decompressed and record
120
1
            bool is_cache_payload_decompressed =
121
1
                    should_cache_decompressed(&_cur_page_header, _metadata);
122
123
1
            if (is_cache_payload_decompressed) {
124
1
                _page_statistics.page_cache_decompressed_hit_counter += 1;
125
1
            } else {
126
0
                _page_statistics.page_cache_compressed_hit_counter += 1;
127
0
            }
128
129
1
            _is_cache_payload_decompressed = is_cache_payload_decompressed;
130
131
1
            if constexpr (OFFSET_INDEX == false) {
132
1
                if (is_header_v2()) {
133
0
                    _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
134
1
                } else if constexpr (!IN_COLLECTION) {
135
1
                    _end_row = _start_row + _cur_page_header.data_page_header.num_values;
136
1
                }
137
1
            }
138
139
            // Save header bytes for later use (e.g., to insert updated cache entries)
140
1
            _header_buf.assign(s.data, s.data + real_header_size);
141
1
            _last_header_size = real_header_size;
142
1
            _page_statistics.parse_page_header_num++;
143
1
            _offset += real_header_size;
144
1
            _next_header_offset = _offset + _cur_page_header.compressed_page_size;
145
1
            _state = HEADER_PARSED;
146
1
            return Status::OK();
147
2
        } else {
148
2
            _page_statistics.page_cache_missing_counter += 1;
149
            // Clear any existing cache handle on miss to avoid holding stale handle
150
2
            _page_cache_handle = PageCacheHandle();
151
2
        }
152
3
    }
153
    // NOTE: page cache lookup for *decompressed* page data is handled in
154
    // ColumnChunkReader::load_page_data(). PageReader should only be
155
    // responsible for parsing the header bytes from the file and saving
156
    // them in `_header_buf` for possible later insertion into the cache.
157
2
    while (true) {
158
2
        if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
159
0
            return Status::EndOfFile("stop");
160
0
        }
161
2
        header_size = std::min(header_size, max_size);
162
2
        {
163
2
            SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time);
164
2
            RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx));
165
2
        }
166
2
        real_header_size = cast_set<uint32_t>(header_size);
167
2
        SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
168
2
        auto st =
169
2
                deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header);
170
2
        if (st.ok()) {
171
2
            break;
172
2
        }
173
0
        if (_offset + header_size >= _end_offset || real_header_size > MAX_PAGE_HEADER_SIZE) {
174
0
            return Status::IOError(
175
0
                    "Failed to deserialize parquet page header. offset: {}, "
176
0
                    "header size: {}, end offset: {}, real header size: {}",
177
0
                    _offset, header_size, _end_offset, real_header_size);
178
0
        }
179
0
        header_size <<= 2;
180
0
    }
181
182
2
    if constexpr (OFFSET_INDEX == false) {
183
2
        if (is_header_v2()) {
184
0
            _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
185
2
        } else if constexpr (!IN_COLLECTION) {
186
2
            _end_row = _start_row + _cur_page_header.data_page_header.num_values;
187
2
        }
188
2
    }
189
190
    // Save header bytes for possible cache insertion later
191
2
    _header_buf.assign(page_header_buf, page_header_buf + real_header_size);
192
2
    _last_header_size = real_header_size;
193
2
    _page_statistics.parse_page_header_num++;
194
2
    _offset += real_header_size;
195
2
    _next_header_offset = _offset + _cur_page_header.compressed_page_size;
196
2
    _state = HEADER_PARSED;
197
2
    return Status::OK();
198
2
}
Unexecuted instantiation: _ZN5doris10PageReaderILb0ELb1EE17parse_page_headerEv
_ZN5doris10PageReaderILb0ELb0EE17parse_page_headerEv
Line
Count
Source
77
187
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() {
78
187
    if (_state == HEADER_PARSED) {
79
0
        return Status::OK();
80
0
    }
81
187
    if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) {
82
0
        return Status::IOError("Out-of-bounds Access");
83
0
    }
84
187
    if (UNLIKELY(_offset != _next_header_offset)) {
85
0
        return Status::IOError("Wrong header position, should seek to a page header first");
86
0
    }
87
187
    if (UNLIKELY(_state != INITIALIZED)) {
88
0
        return Status::IOError("Should skip or load current page to get next page");
89
0
    }
90
91
187
    _page_statistics.page_read_counter += 1;
92
93
    // Parse page header from file; header bytes are saved for possible cache insertion
94
187
    const uint8_t* page_header_buf = nullptr;
95
187
    size_t max_size = _end_offset - _offset;
96
187
    size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size);
97
187
    const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20;
98
187
    uint32_t real_header_size = 0;
99
100
    // Try a header-only lookup in the page cache. Cached pages store
101
    // header + optional v2 levels + uncompressed payload, so we can
102
    // parse the page header directly from the cached bytes and avoid
103
    // a file read for the header.
104
187
    if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache &&
105
187
        StoragePageCache::instance() != nullptr) {
106
187
        PageCacheHandle handle;
107
187
        StoragePageCache::CacheKey key = make_page_cache_key(static_cast<int64_t>(_offset));
108
187
        if (StoragePageCache::instance()->lookup(key, &handle, segment_v2::DATA_PAGE)) {
109
            // Parse header directly from cached data
110
109
            _page_cache_handle = std::move(handle);
111
109
            Slice s = _page_cache_handle.data();
112
109
            real_header_size = cast_set<uint32_t>(s.size);
113
109
            SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
114
109
            auto st = deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(s.data),
115
109
                                             &real_header_size, true, &_cur_page_header);
116
109
            if (!st.ok()) return st;
117
            // Increment page cache counters for a true cache hit on header+payload
118
109
            _page_statistics.page_cache_hit_counter += 1;
119
            // Detect whether the cached payload is compressed or decompressed and record
120
109
            bool is_cache_payload_decompressed =
121
109
                    should_cache_decompressed(&_cur_page_header, _metadata);
122
123
109
            if (is_cache_payload_decompressed) {
124
109
                _page_statistics.page_cache_decompressed_hit_counter += 1;
125
109
            } else {
126
0
                _page_statistics.page_cache_compressed_hit_counter += 1;
127
0
            }
128
129
109
            _is_cache_payload_decompressed = is_cache_payload_decompressed;
130
131
109
            if constexpr (OFFSET_INDEX == false) {
132
109
                if (is_header_v2()) {
133
8
                    _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
134
101
                } else if constexpr (!IN_COLLECTION) {
135
101
                    _end_row = _start_row + _cur_page_header.data_page_header.num_values;
136
101
                }
137
109
            }
138
139
            // Save header bytes for later use (e.g., to insert updated cache entries)
140
109
            _header_buf.assign(s.data, s.data + real_header_size);
141
109
            _last_header_size = real_header_size;
142
109
            _page_statistics.parse_page_header_num++;
143
109
            _offset += real_header_size;
144
109
            _next_header_offset = _offset + _cur_page_header.compressed_page_size;
145
109
            _state = HEADER_PARSED;
146
109
            return Status::OK();
147
109
        } else {
148
78
            _page_statistics.page_cache_missing_counter += 1;
149
            // Clear any existing cache handle on miss to avoid holding stale handle
150
78
            _page_cache_handle = PageCacheHandle();
151
78
        }
152
187
    }
153
    // NOTE: page cache lookup for *decompressed* page data is handled in
154
    // ColumnChunkReader::load_page_data(). PageReader should only be
155
    // responsible for parsing the header bytes from the file and saving
156
    // them in `_header_buf` for possible later insertion into the cache.
157
79
    while (true) {
158
79
        if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
159
0
            return Status::EndOfFile("stop");
160
0
        }
161
79
        header_size = std::min(header_size, max_size);
162
79
        {
163
79
            SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time);
164
79
            RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx));
165
79
        }
166
79
        real_header_size = cast_set<uint32_t>(header_size);
167
79
        SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
168
79
        auto st =
169
79
                deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header);
170
79
        if (st.ok()) {
171
78
            break;
172
78
        }
173
1
        if (_offset + header_size >= _end_offset || real_header_size > MAX_PAGE_HEADER_SIZE) {
174
0
            return Status::IOError(
175
0
                    "Failed to deserialize parquet page header. offset: {}, "
176
0
                    "header size: {}, end offset: {}, real header size: {}",
177
0
                    _offset, header_size, _end_offset, real_header_size);
178
0
        }
179
1
        header_size <<= 2;
180
1
    }
181
182
78
    if constexpr (OFFSET_INDEX == false) {
183
78
        if (is_header_v2()) {
184
4
            _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
185
74
        } else if constexpr (!IN_COLLECTION) {
186
74
            _end_row = _start_row + _cur_page_header.data_page_header.num_values;
187
74
        }
188
78
    }
189
190
    // Save header bytes for possible cache insertion later
191
78
    _header_buf.assign(page_header_buf, page_header_buf + real_header_size);
192
78
    _last_header_size = real_header_size;
193
78
    _page_statistics.parse_page_header_num++;
194
78
    _offset += real_header_size;
195
78
    _next_header_offset = _offset + _cur_page_header.compressed_page_size;
196
78
    _state = HEADER_PARSED;
197
78
    return Status::OK();
198
78
}
199
200
template <bool IN_COLLECTION, bool OFFSET_INDEX>
201
79
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::get_page_data(Slice& slice) {
202
79
    if (UNLIKELY(_state != HEADER_PARSED)) {
203
0
        return Status::IOError("Should generate page header first to load current page data");
204
0
    }
205
79
    if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
206
0
        return Status::EndOfFile("stop");
207
0
    }
208
79
    slice.size = _cur_page_header.compressed_page_size;
209
79
    RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx));
210
79
    _offset += slice.size;
211
79
    _state = DATA_LOADED;
212
79
    return Status::OK();
213
79
}
Unexecuted instantiation: _ZN5doris10PageReaderILb1ELb1EE13get_page_dataERNS_5SliceE
_ZN5doris10PageReaderILb1ELb0EE13get_page_dataERNS_5SliceE
Line
Count
Source
201
2
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::get_page_data(Slice& slice) {
202
2
    if (UNLIKELY(_state != HEADER_PARSED)) {
203
0
        return Status::IOError("Should generate page header first to load current page data");
204
0
    }
205
2
    if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
206
0
        return Status::EndOfFile("stop");
207
0
    }
208
2
    slice.size = _cur_page_header.compressed_page_size;
209
2
    RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx));
210
2
    _offset += slice.size;
211
2
    _state = DATA_LOADED;
212
2
    return Status::OK();
213
2
}
Unexecuted instantiation: _ZN5doris10PageReaderILb0ELb1EE13get_page_dataERNS_5SliceE
_ZN5doris10PageReaderILb0ELb0EE13get_page_dataERNS_5SliceE
Line
Count
Source
201
77
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::get_page_data(Slice& slice) {
202
77
    if (UNLIKELY(_state != HEADER_PARSED)) {
203
0
        return Status::IOError("Should generate page header first to load current page data");
204
0
    }
205
77
    if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
206
0
        return Status::EndOfFile("stop");
207
0
    }
208
77
    slice.size = _cur_page_header.compressed_page_size;
209
77
    RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx));
210
77
    _offset += slice.size;
211
77
    _state = DATA_LOADED;
212
77
    return Status::OK();
213
77
}
214
215
template class PageReader<true, true>;
216
template class PageReader<true, false>;
217
template class PageReader<false, true>;
218
template class PageReader<false, false>;
219
220
} // namespace doris