Coverage Report

Created: 2026-04-15 20:02

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/vparquet_page_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/parquet/vparquet_page_reader.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/parquet_types.h>
22
#include <stddef.h>
23
#include <stdint.h>
24
25
#include <algorithm>
26
27
#include "common/compiler_util.h" // IWYU pragma: keep
28
#include "common/config.h"
29
#include "format/parquet/parquet_common.h"
30
#include "io/fs/buffered_reader.h"
31
#include "runtime/runtime_profile.h"
32
#include "storage/cache/page_cache.h"
33
#include "util/slice.h"
34
#include "util/thrift_util.h"
35
36
namespace doris {
37
namespace io {
38
struct IOContext;
39
} // namespace io
40
} // namespace doris
41
42
namespace doris {
43
static constexpr size_t INIT_PAGE_HEADER_SIZE = 128;
44
45
167
void ParquetPageCacheKeyBuilder::init(const std::string& path, int64_t mtime) {
46
167
    _file_key_prefix = fmt::format("{}::{}", path, mtime);
47
167
}
48
49
template <bool IN_COLLECTION, bool OFFSET_INDEX>
50
PageReader<IN_COLLECTION, OFFSET_INDEX>::PageReader(io::BufferedStreamReader* reader,
51
                                                    io::IOContext* io_ctx, uint64_t offset,
52
                                                    uint64_t length, size_t total_rows,
53
                                                    const tparquet::ColumnMetaData& metadata,
54
                                                    const ParquetPageReadContext& page_read_ctx,
55
                                                    const tparquet::OffsetIndex* offset_index)
56
167
        : _reader(reader),
57
167
          _io_ctx(io_ctx),
58
167
          _offset(offset),
59
167
          _start_offset(offset),
60
167
          _end_offset(offset + length),
61
167
          _total_rows(total_rows),
62
167
          _metadata(metadata),
63
167
          _page_read_ctx(page_read_ctx),
64
167
          _offset_index(offset_index) {
65
167
    _next_header_offset = _offset;
66
167
    _state = INITIALIZED;
67
167
    _page_cache_key_builder.init(_reader->path(), _reader->mtime());
68
69
167
    if constexpr (OFFSET_INDEX) {
70
0
        _end_row = _offset_index->page_locations.size() >= 2
71
0
                           ? _offset_index->page_locations[1].first_row_index
72
0
                           : _total_rows;
73
0
    }
74
167
}
Unexecuted instantiation: _ZN5doris10PageReaderILb1ELb1EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE
_ZN5doris10PageReaderILb1ELb0EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE
Line
Count
Source
56
2
        : _reader(reader),
57
2
          _io_ctx(io_ctx),
58
2
          _offset(offset),
59
2
          _start_offset(offset),
60
2
          _end_offset(offset + length),
61
2
          _total_rows(total_rows),
62
2
          _metadata(metadata),
63
2
          _page_read_ctx(page_read_ctx),
64
2
          _offset_index(offset_index) {
65
2
    _next_header_offset = _offset;
66
2
    _state = INITIALIZED;
67
2
    _page_cache_key_builder.init(_reader->path(), _reader->mtime());
68
69
    if constexpr (OFFSET_INDEX) {
70
        _end_row = _offset_index->page_locations.size() >= 2
71
                           ? _offset_index->page_locations[1].first_row_index
72
                           : _total_rows;
73
    }
74
2
}
Unexecuted instantiation: _ZN5doris10PageReaderILb0ELb1EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE
_ZN5doris10PageReaderILb0ELb0EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE
Line
Count
Source
56
165
        : _reader(reader),
57
165
          _io_ctx(io_ctx),
58
165
          _offset(offset),
59
165
          _start_offset(offset),
60
165
          _end_offset(offset + length),
61
165
          _total_rows(total_rows),
62
165
          _metadata(metadata),
63
165
          _page_read_ctx(page_read_ctx),
64
165
          _offset_index(offset_index) {
65
165
    _next_header_offset = _offset;
66
165
    _state = INITIALIZED;
67
165
    _page_cache_key_builder.init(_reader->path(), _reader->mtime());
68
69
    if constexpr (OFFSET_INDEX) {
70
        _end_row = _offset_index->page_locations.size() >= 2
71
                           ? _offset_index->page_locations[1].first_row_index
72
                           : _total_rows;
73
    }
74
165
}
75
76
template <bool IN_COLLECTION, bool OFFSET_INDEX>
77
196
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() {
78
196
    if (_state == HEADER_PARSED) {
79
0
        return Status::OK();
80
0
    }
81
196
    if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) {
82
0
        return Status::IOError("Out-of-bounds Access");
83
0
    }
84
196
    if (UNLIKELY(_offset != _next_header_offset)) {
85
0
        return Status::IOError("Wrong header position, should seek to a page header first");
86
0
    }
87
196
    if (UNLIKELY(_state != INITIALIZED)) {
88
0
        return Status::IOError("Should skip or load current page to get next page");
89
0
    }
90
91
196
    _page_statistics.page_read_counter += 1;
92
93
    // Parse page header from file; header bytes are saved for possible cache insertion
94
196
    const uint8_t* page_header_buf = nullptr;
95
196
    size_t max_size = _end_offset - _offset;
96
196
    size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size);
97
196
    const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20;
98
196
    uint32_t real_header_size = 0;
99
100
    // Try a header-only lookup in the page cache. Cached pages store
101
    // header + optional v2 levels + uncompressed payload, so we can
102
    // parse the page header directly from the cached bytes and avoid
103
    // a file read for the header.
104
196
    if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache &&
105
196
        StoragePageCache::instance() != nullptr) {
106
196
        PageCacheHandle handle;
107
196
        StoragePageCache::CacheKey key = make_page_cache_key(static_cast<int64_t>(_offset));
108
196
        if (StoragePageCache::instance()->lookup(key, &handle, segment_v2::DATA_PAGE)) {
109
            // Parse header directly from cached data
110
121
            _page_cache_handle = std::move(handle);
111
121
            Slice s = _page_cache_handle.data();
112
121
            real_header_size = cast_set<uint32_t>(s.size);
113
121
            SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
114
121
            auto st = deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(s.data),
115
121
                                             &real_header_size, true, &_cur_page_header);
116
121
            if (!st.ok()) return st;
117
            // Increment page cache counters for a true cache hit on header+payload
118
121
            _page_statistics.page_cache_hit_counter += 1;
119
            // Detect whether the cached payload is compressed or decompressed and record
120
121
            bool is_cache_payload_decompressed =
121
121
                    should_cache_decompressed(&_cur_page_header, _metadata);
122
123
121
            if (is_cache_payload_decompressed) {
124
121
                _page_statistics.page_cache_decompressed_hit_counter += 1;
125
121
            } else {
126
0
                _page_statistics.page_cache_compressed_hit_counter += 1;
127
0
            }
128
129
121
            _is_cache_payload_decompressed = is_cache_payload_decompressed;
130
131
121
            if constexpr (OFFSET_INDEX == false) {
132
121
                if (is_header_v2()) {
133
8
                    _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
134
112
                } else if constexpr (!IN_COLLECTION) {
135
112
                    _end_row = _start_row + _cur_page_header.data_page_header.num_values;
136
112
                }
137
121
            }
138
139
            // Save header bytes for later use (e.g., to insert updated cache entries)
140
121
            _header_buf.assign(s.data, s.data + real_header_size);
141
121
            _last_header_size = real_header_size;
142
121
            _page_statistics.parse_page_header_num++;
143
121
            _offset += real_header_size;
144
121
            _next_header_offset = _offset + _cur_page_header.compressed_page_size;
145
121
            _state = HEADER_PARSED;
146
121
            return Status::OK();
147
121
        } else {
148
75
            _page_statistics.page_cache_missing_counter += 1;
149
            // Clear any existing cache handle on miss to avoid holding stale handle
150
75
            _page_cache_handle = PageCacheHandle();
151
75
        }
152
196
    }
153
    // NOTE: page cache lookup for *decompressed* page data is handled in
154
    // ColumnChunkReader::load_page_data(). PageReader should only be
155
    // responsible for parsing the header bytes from the file and saving
156
    // them in `_header_buf` for possible later insertion into the cache.
157
76
    while (true) {
158
76
        if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
159
0
            return Status::EndOfFile("stop");
160
0
        }
161
76
        header_size = std::min(header_size, max_size);
162
76
        {
163
76
            SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time);
164
76
            RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx));
165
76
        }
166
76
        real_header_size = cast_set<uint32_t>(header_size);
167
76
        SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
168
76
        auto st =
169
76
                deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header);
170
76
        if (st.ok()) {
171
75
            break;
172
75
        }
173
1
        if (_offset + header_size >= _end_offset || real_header_size > MAX_PAGE_HEADER_SIZE) {
174
0
            return Status::IOError(
175
0
                    "Failed to deserialize parquet page header. offset: {}, "
176
0
                    "header size: {}, end offset: {}, real header size: {}",
177
0
                    _offset, header_size, _end_offset, real_header_size);
178
0
        }
179
1
        header_size <<= 2;
180
1
    }
181
182
75
    if constexpr (OFFSET_INDEX == false) {
183
75
        if (is_header_v2()) {
184
4
            _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
185
70
        } else if constexpr (!IN_COLLECTION) {
186
70
            _end_row = _start_row + _cur_page_header.data_page_header.num_values;
187
70
        }
188
75
    }
189
190
    // Save header bytes for possible cache insertion later
191
75
    _header_buf.assign(page_header_buf, page_header_buf + real_header_size);
192
75
    _last_header_size = real_header_size;
193
75
    _page_statistics.parse_page_header_num++;
194
75
    _offset += real_header_size;
195
75
    _next_header_offset = _offset + _cur_page_header.compressed_page_size;
196
75
    _state = HEADER_PARSED;
197
75
    return Status::OK();
198
75
}
Unexecuted instantiation: _ZN5doris10PageReaderILb1ELb1EE17parse_page_headerEv
_ZN5doris10PageReaderILb1ELb0EE17parse_page_headerEv
Line
Count
Source
77
2
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() {
78
2
    if (_state == HEADER_PARSED) {
79
0
        return Status::OK();
80
0
    }
81
2
    if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) {
82
0
        return Status::IOError("Out-of-bounds Access");
83
0
    }
84
2
    if (UNLIKELY(_offset != _next_header_offset)) {
85
0
        return Status::IOError("Wrong header position, should seek to a page header first");
86
0
    }
87
2
    if (UNLIKELY(_state != INITIALIZED)) {
88
0
        return Status::IOError("Should skip or load current page to get next page");
89
0
    }
90
91
2
    _page_statistics.page_read_counter += 1;
92
93
    // Parse page header from file; header bytes are saved for possible cache insertion
94
2
    const uint8_t* page_header_buf = nullptr;
95
2
    size_t max_size = _end_offset - _offset;
96
2
    size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size);
97
2
    const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20;
98
2
    uint32_t real_header_size = 0;
99
100
    // Try a header-only lookup in the page cache. Cached pages store
101
    // header + optional v2 levels + uncompressed payload, so we can
102
    // parse the page header directly from the cached bytes and avoid
103
    // a file read for the header.
104
2
    if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache &&
105
2
        StoragePageCache::instance() != nullptr) {
106
2
        PageCacheHandle handle;
107
2
        StoragePageCache::CacheKey key = make_page_cache_key(static_cast<int64_t>(_offset));
108
2
        if (StoragePageCache::instance()->lookup(key, &handle, segment_v2::DATA_PAGE)) {
109
            // Parse header directly from cached data
110
1
            _page_cache_handle = std::move(handle);
111
1
            Slice s = _page_cache_handle.data();
112
1
            real_header_size = cast_set<uint32_t>(s.size);
113
1
            SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
114
1
            auto st = deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(s.data),
115
1
                                             &real_header_size, true, &_cur_page_header);
116
1
            if (!st.ok()) return st;
117
            // Increment page cache counters for a true cache hit on header+payload
118
1
            _page_statistics.page_cache_hit_counter += 1;
119
            // Detect whether the cached payload is compressed or decompressed and record
120
1
            bool is_cache_payload_decompressed =
121
1
                    should_cache_decompressed(&_cur_page_header, _metadata);
122
123
1
            if (is_cache_payload_decompressed) {
124
1
                _page_statistics.page_cache_decompressed_hit_counter += 1;
125
1
            } else {
126
0
                _page_statistics.page_cache_compressed_hit_counter += 1;
127
0
            }
128
129
1
            _is_cache_payload_decompressed = is_cache_payload_decompressed;
130
131
1
            if constexpr (OFFSET_INDEX == false) {
132
1
                if (is_header_v2()) {
133
0
                    _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
134
1
                } else if constexpr (!IN_COLLECTION) {
135
1
                    _end_row = _start_row + _cur_page_header.data_page_header.num_values;
136
1
                }
137
1
            }
138
139
            // Save header bytes for later use (e.g., to insert updated cache entries)
140
1
            _header_buf.assign(s.data, s.data + real_header_size);
141
1
            _last_header_size = real_header_size;
142
1
            _page_statistics.parse_page_header_num++;
143
1
            _offset += real_header_size;
144
1
            _next_header_offset = _offset + _cur_page_header.compressed_page_size;
145
1
            _state = HEADER_PARSED;
146
1
            return Status::OK();
147
1
        } else {
148
1
            _page_statistics.page_cache_missing_counter += 1;
149
            // Clear any existing cache handle on miss to avoid holding stale handle
150
1
            _page_cache_handle = PageCacheHandle();
151
1
        }
152
2
    }
153
    // NOTE: page cache lookup for *decompressed* page data is handled in
154
    // ColumnChunkReader::load_page_data(). PageReader should only be
155
    // responsible for parsing the header bytes from the file and saving
156
    // them in `_header_buf` for possible later insertion into the cache.
157
1
    while (true) {
158
1
        if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
159
0
            return Status::EndOfFile("stop");
160
0
        }
161
1
        header_size = std::min(header_size, max_size);
162
1
        {
163
1
            SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time);
164
1
            RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx));
165
1
        }
166
1
        real_header_size = cast_set<uint32_t>(header_size);
167
1
        SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
168
1
        auto st =
169
1
                deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header);
170
1
        if (st.ok()) {
171
1
            break;
172
1
        }
173
0
        if (_offset + header_size >= _end_offset || real_header_size > MAX_PAGE_HEADER_SIZE) {
174
0
            return Status::IOError(
175
0
                    "Failed to deserialize parquet page header. offset: {}, "
176
0
                    "header size: {}, end offset: {}, real header size: {}",
177
0
                    _offset, header_size, _end_offset, real_header_size);
178
0
        }
179
0
        header_size <<= 2;
180
0
    }
181
182
1
    if constexpr (OFFSET_INDEX == false) {
183
1
        if (is_header_v2()) {
184
0
            _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
185
1
        } else if constexpr (!IN_COLLECTION) {
186
1
            _end_row = _start_row + _cur_page_header.data_page_header.num_values;
187
1
        }
188
1
    }
189
190
    // Save header bytes for possible cache insertion later
191
1
    _header_buf.assign(page_header_buf, page_header_buf + real_header_size);
192
1
    _last_header_size = real_header_size;
193
1
    _page_statistics.parse_page_header_num++;
194
1
    _offset += real_header_size;
195
1
    _next_header_offset = _offset + _cur_page_header.compressed_page_size;
196
1
    _state = HEADER_PARSED;
197
1
    return Status::OK();
198
1
}
Unexecuted instantiation: _ZN5doris10PageReaderILb0ELb1EE17parse_page_headerEv
_ZN5doris10PageReaderILb0ELb0EE17parse_page_headerEv
Line
Count
Source
77
194
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() {
78
194
    if (_state == HEADER_PARSED) {
79
0
        return Status::OK();
80
0
    }
81
194
    if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) {
82
0
        return Status::IOError("Out-of-bounds Access");
83
0
    }
84
194
    if (UNLIKELY(_offset != _next_header_offset)) {
85
0
        return Status::IOError("Wrong header position, should seek to a page header first");
86
0
    }
87
194
    if (UNLIKELY(_state != INITIALIZED)) {
88
0
        return Status::IOError("Should skip or load current page to get next page");
89
0
    }
90
91
194
    _page_statistics.page_read_counter += 1;
92
93
    // Parse page header from file; header bytes are saved for possible cache insertion
94
194
    const uint8_t* page_header_buf = nullptr;
95
194
    size_t max_size = _end_offset - _offset;
96
194
    size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size);
97
194
    const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20;
98
194
    uint32_t real_header_size = 0;
99
100
    // Try a header-only lookup in the page cache. Cached pages store
101
    // header + optional v2 levels + uncompressed payload, so we can
102
    // parse the page header directly from the cached bytes and avoid
103
    // a file read for the header.
104
194
    if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache &&
105
194
        StoragePageCache::instance() != nullptr) {
106
194
        PageCacheHandle handle;
107
194
        StoragePageCache::CacheKey key = make_page_cache_key(static_cast<int64_t>(_offset));
108
194
        if (StoragePageCache::instance()->lookup(key, &handle, segment_v2::DATA_PAGE)) {
109
            // Parse header directly from cached data
110
120
            _page_cache_handle = std::move(handle);
111
120
            Slice s = _page_cache_handle.data();
112
120
            real_header_size = cast_set<uint32_t>(s.size);
113
120
            SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
114
120
            auto st = deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(s.data),
115
120
                                             &real_header_size, true, &_cur_page_header);
116
120
            if (!st.ok()) return st;
117
            // Increment page cache counters for a true cache hit on header+payload
118
120
            _page_statistics.page_cache_hit_counter += 1;
119
            // Detect whether the cached payload is compressed or decompressed and record
120
120
            bool is_cache_payload_decompressed =
121
120
                    should_cache_decompressed(&_cur_page_header, _metadata);
122
123
120
            if (is_cache_payload_decompressed) {
124
120
                _page_statistics.page_cache_decompressed_hit_counter += 1;
125
120
            } else {
126
0
                _page_statistics.page_cache_compressed_hit_counter += 1;
127
0
            }
128
129
120
            _is_cache_payload_decompressed = is_cache_payload_decompressed;
130
131
120
            if constexpr (OFFSET_INDEX == false) {
132
120
                if (is_header_v2()) {
133
8
                    _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
134
112
                } else if constexpr (!IN_COLLECTION) {
135
112
                    _end_row = _start_row + _cur_page_header.data_page_header.num_values;
136
112
                }
137
120
            }
138
139
            // Save header bytes for later use (e.g., to insert updated cache entries)
140
120
            _header_buf.assign(s.data, s.data + real_header_size);
141
120
            _last_header_size = real_header_size;
142
120
            _page_statistics.parse_page_header_num++;
143
120
            _offset += real_header_size;
144
120
            _next_header_offset = _offset + _cur_page_header.compressed_page_size;
145
120
            _state = HEADER_PARSED;
146
120
            return Status::OK();
147
120
        } else {
148
74
            _page_statistics.page_cache_missing_counter += 1;
149
            // Clear any existing cache handle on miss to avoid holding stale handle
150
74
            _page_cache_handle = PageCacheHandle();
151
74
        }
152
194
    }
153
    // NOTE: page cache lookup for *decompressed* page data is handled in
154
    // ColumnChunkReader::load_page_data(). PageReader should only be
155
    // responsible for parsing the header bytes from the file and saving
156
    // them in `_header_buf` for possible later insertion into the cache.
157
75
    while (true) {
158
75
        if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
159
0
            return Status::EndOfFile("stop");
160
0
        }
161
75
        header_size = std::min(header_size, max_size);
162
75
        {
163
75
            SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time);
164
75
            RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx));
165
75
        }
166
75
        real_header_size = cast_set<uint32_t>(header_size);
167
75
        SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
168
75
        auto st =
169
75
                deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header);
170
75
        if (st.ok()) {
171
74
            break;
172
74
        }
173
1
        if (_offset + header_size >= _end_offset || real_header_size > MAX_PAGE_HEADER_SIZE) {
174
0
            return Status::IOError(
175
0
                    "Failed to deserialize parquet page header. offset: {}, "
176
0
                    "header size: {}, end offset: {}, real header size: {}",
177
0
                    _offset, header_size, _end_offset, real_header_size);
178
0
        }
179
1
        header_size <<= 2;
180
1
    }
181
182
74
    if constexpr (OFFSET_INDEX == false) {
183
74
        if (is_header_v2()) {
184
4
            _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
185
70
        } else if constexpr (!IN_COLLECTION) {
186
70
            _end_row = _start_row + _cur_page_header.data_page_header.num_values;
187
70
        }
188
74
    }
189
190
    // Save header bytes for possible cache insertion later
191
74
    _header_buf.assign(page_header_buf, page_header_buf + real_header_size);
192
74
    _last_header_size = real_header_size;
193
74
    _page_statistics.parse_page_header_num++;
194
74
    _offset += real_header_size;
195
74
    _next_header_offset = _offset + _cur_page_header.compressed_page_size;
196
74
    _state = HEADER_PARSED;
197
74
    return Status::OK();
198
74
}
199
200
template <bool IN_COLLECTION, bool OFFSET_INDEX>
201
74
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::get_page_data(Slice& slice) {
202
74
    if (UNLIKELY(_state != HEADER_PARSED)) {
203
0
        return Status::IOError("Should generate page header first to load current page data");
204
0
    }
205
74
    if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
206
0
        return Status::EndOfFile("stop");
207
0
    }
208
74
    slice.size = _cur_page_header.compressed_page_size;
209
74
    RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx));
210
74
    _offset += slice.size;
211
74
    _state = DATA_LOADED;
212
74
    return Status::OK();
213
74
}
Unexecuted instantiation: _ZN5doris10PageReaderILb1ELb1EE13get_page_dataERNS_5SliceE
_ZN5doris10PageReaderILb1ELb0EE13get_page_dataERNS_5SliceE
Line
Count
Source
201
1
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::get_page_data(Slice& slice) {
202
1
    if (UNLIKELY(_state != HEADER_PARSED)) {
203
0
        return Status::IOError("Should generate page header first to load current page data");
204
0
    }
205
1
    if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
206
0
        return Status::EndOfFile("stop");
207
0
    }
208
1
    slice.size = _cur_page_header.compressed_page_size;
209
1
    RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx));
210
1
    _offset += slice.size;
211
1
    _state = DATA_LOADED;
212
1
    return Status::OK();
213
1
}
Unexecuted instantiation: _ZN5doris10PageReaderILb0ELb1EE13get_page_dataERNS_5SliceE
_ZN5doris10PageReaderILb0ELb0EE13get_page_dataERNS_5SliceE
Line
Count
Source
201
73
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::get_page_data(Slice& slice) {
202
73
    if (UNLIKELY(_state != HEADER_PARSED)) {
203
0
        return Status::IOError("Should generate page header first to load current page data");
204
0
    }
205
73
    if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
206
0
        return Status::EndOfFile("stop");
207
0
    }
208
73
    slice.size = _cur_page_header.compressed_page_size;
209
73
    RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx));
210
73
    _offset += slice.size;
211
73
    _state = DATA_LOADED;
212
73
    return Status::OK();
213
73
}
214
215
template class PageReader<true, true>;
216
template class PageReader<true, false>;
217
template class PageReader<false, true>;
218
template class PageReader<false, false>;
219
220
} // namespace doris