Coverage Report

Created: 2026-03-19 16:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/vparquet_page_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/parquet/vparquet_page_reader.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/parquet_types.h>
22
#include <stddef.h>
23
#include <stdint.h>
24
25
#include <algorithm>
26
27
#include "common/compiler_util.h" // IWYU pragma: keep
28
#include "common/config.h"
29
#include "format/parquet/parquet_common.h"
30
#include "io/fs/buffered_reader.h"
31
#include "runtime/runtime_profile.h"
32
#include "storage/cache/page_cache.h"
33
#include "util/slice.h"
34
#include "util/thrift_util.h"
35
36
namespace doris {
37
namespace io {
38
struct IOContext;
39
} // namespace io
40
} // namespace doris
41
42
namespace doris {
43
#include "common/compile_check_begin.h"
44
static constexpr size_t INIT_PAGE_HEADER_SIZE = 128;
45
46
165
void ParquetPageCacheKeyBuilder::init(const std::string& path, int64_t mtime) {
47
165
    _file_key_prefix = fmt::format("{}::{}", path, mtime);
48
165
}
49
50
template <bool IN_COLLECTION, bool OFFSET_INDEX>
51
PageReader<IN_COLLECTION, OFFSET_INDEX>::PageReader(io::BufferedStreamReader* reader,
52
                                                    io::IOContext* io_ctx, uint64_t offset,
53
                                                    uint64_t length, size_t total_rows,
54
                                                    const tparquet::ColumnMetaData& metadata,
55
                                                    const ParquetPageReadContext& page_read_ctx,
56
                                                    const tparquet::OffsetIndex* offset_index)
57
165
        : _reader(reader),
58
165
          _io_ctx(io_ctx),
59
165
          _offset(offset),
60
165
          _start_offset(offset),
61
165
          _end_offset(offset + length),
62
165
          _total_rows(total_rows),
63
165
          _metadata(metadata),
64
165
          _page_read_ctx(page_read_ctx),
65
165
          _offset_index(offset_index) {
66
165
    _next_header_offset = _offset;
67
165
    _state = INITIALIZED;
68
165
    _page_cache_key_builder.init(_reader->path(), _reader->mtime());
69
70
165
    if constexpr (OFFSET_INDEX) {
71
0
        _end_row = _offset_index->page_locations.size() >= 2
72
0
                           ? _offset_index->page_locations[1].first_row_index
73
0
                           : _total_rows;
74
0
    }
75
165
}
Unexecuted instantiation: _ZN5doris10PageReaderILb1ELb1EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE
_ZN5doris10PageReaderILb1ELb0EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE
Line
Count
Source
57
2
        : _reader(reader),
58
2
          _io_ctx(io_ctx),
59
2
          _offset(offset),
60
2
          _start_offset(offset),
61
2
          _end_offset(offset + length),
62
2
          _total_rows(total_rows),
63
2
          _metadata(metadata),
64
2
          _page_read_ctx(page_read_ctx),
65
2
          _offset_index(offset_index) {
66
2
    _next_header_offset = _offset;
67
2
    _state = INITIALIZED;
68
2
    _page_cache_key_builder.init(_reader->path(), _reader->mtime());
69
70
    if constexpr (OFFSET_INDEX) {
71
        _end_row = _offset_index->page_locations.size() >= 2
72
                           ? _offset_index->page_locations[1].first_row_index
73
                           : _total_rows;
74
    }
75
2
}
Unexecuted instantiation: _ZN5doris10PageReaderILb0ELb1EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE
_ZN5doris10PageReaderILb0ELb0EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE
Line
Count
Source
57
163
        : _reader(reader),
58
163
          _io_ctx(io_ctx),
59
163
          _offset(offset),
60
163
          _start_offset(offset),
61
163
          _end_offset(offset + length),
62
163
          _total_rows(total_rows),
63
163
          _metadata(metadata),
64
163
          _page_read_ctx(page_read_ctx),
65
163
          _offset_index(offset_index) {
66
163
    _next_header_offset = _offset;
67
163
    _state = INITIALIZED;
68
163
    _page_cache_key_builder.init(_reader->path(), _reader->mtime());
69
70
    if constexpr (OFFSET_INDEX) {
71
        _end_row = _offset_index->page_locations.size() >= 2
72
                           ? _offset_index->page_locations[1].first_row_index
73
                           : _total_rows;
74
    }
75
163
}
76
77
template <bool IN_COLLECTION, bool OFFSET_INDEX>
78
193
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() {
79
193
    if (_state == HEADER_PARSED) {
80
0
        return Status::OK();
81
0
    }
82
193
    if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) {
83
0
        return Status::IOError("Out-of-bounds Access");
84
0
    }
85
193
    if (UNLIKELY(_offset != _next_header_offset)) {
86
0
        return Status::IOError("Wrong header position, should seek to a page header first");
87
0
    }
88
193
    if (UNLIKELY(_state != INITIALIZED)) {
89
0
        return Status::IOError("Should skip or load current page to get next page");
90
0
    }
91
92
193
    _page_statistics.page_read_counter += 1;
93
94
    // Parse page header from file; header bytes are saved for possible cache insertion
95
193
    const uint8_t* page_header_buf = nullptr;
96
193
    size_t max_size = _end_offset - _offset;
97
193
    size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size);
98
193
    const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20;
99
193
    uint32_t real_header_size = 0;
100
101
    // Try a header-only lookup in the page cache. Cached pages store
102
    // header + optional v2 levels + uncompressed payload, so we can
103
    // parse the page header directly from the cached bytes and avoid
104
    // a file read for the header.
105
193
    if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache &&
106
193
        StoragePageCache::instance() != nullptr) {
107
193
        PageCacheHandle handle;
108
193
        StoragePageCache::CacheKey key = make_page_cache_key(static_cast<int64_t>(_offset));
109
193
        if (StoragePageCache::instance()->lookup(key, &handle, segment_v2::DATA_PAGE)) {
110
            // Parse header directly from cached data
111
121
            _page_cache_handle = std::move(handle);
112
121
            Slice s = _page_cache_handle.data();
113
121
            real_header_size = cast_set<uint32_t>(s.size);
114
121
            SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
115
121
            auto st = deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(s.data),
116
121
                                             &real_header_size, true, &_cur_page_header);
117
121
            if (!st.ok()) return st;
118
            // Increment page cache counters for a true cache hit on header+payload
119
121
            _page_statistics.page_cache_hit_counter += 1;
120
            // Detect whether the cached payload is compressed or decompressed and record
121
121
            bool is_cache_payload_decompressed =
122
121
                    should_cache_decompressed(&_cur_page_header, _metadata);
123
124
121
            if (is_cache_payload_decompressed) {
125
121
                _page_statistics.page_cache_decompressed_hit_counter += 1;
126
121
            } else {
127
0
                _page_statistics.page_cache_compressed_hit_counter += 1;
128
0
            }
129
130
121
            _is_cache_payload_decompressed = is_cache_payload_decompressed;
131
132
121
            if constexpr (OFFSET_INDEX == false) {
133
121
                if (is_header_v2()) {
134
8
                    _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
135
112
                } else if constexpr (!IN_COLLECTION) {
136
112
                    _end_row = _start_row + _cur_page_header.data_page_header.num_values;
137
112
                }
138
121
            }
139
140
            // Save header bytes for later use (e.g., to insert updated cache entries)
141
121
            _header_buf.assign(s.data, s.data + real_header_size);
142
121
            _last_header_size = real_header_size;
143
121
            _page_statistics.parse_page_header_num++;
144
121
            _offset += real_header_size;
145
121
            _next_header_offset = _offset + _cur_page_header.compressed_page_size;
146
121
            _state = HEADER_PARSED;
147
121
            return Status::OK();
148
121
        } else {
149
72
            _page_statistics.page_cache_missing_counter += 1;
150
            // Clear any existing cache handle on miss to avoid holding stale handle
151
72
            _page_cache_handle = PageCacheHandle();
152
72
        }
153
193
    }
154
    // NOTE: page cache lookup for *decompressed* page data is handled in
155
    // ColumnChunkReader::load_page_data(). PageReader should only be
156
    // responsible for parsing the header bytes from the file and saving
157
    // them in `_header_buf` for possible later insertion into the cache.
158
72
    while (true) {
159
72
        if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
160
0
            return Status::EndOfFile("stop");
161
0
        }
162
72
        header_size = std::min(header_size, max_size);
163
72
        {
164
72
            SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time);
165
72
            RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx));
166
72
        }
167
72
        real_header_size = cast_set<uint32_t>(header_size);
168
72
        SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
169
72
        auto st =
170
72
                deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header);
171
72
        if (st.ok()) {
172
72
            break;
173
72
        }
174
0
        if (_offset + header_size >= _end_offset || real_header_size > MAX_PAGE_HEADER_SIZE) {
175
0
            return Status::IOError(
176
0
                    "Failed to deserialize parquet page header. offset: {}, "
177
0
                    "header size: {}, end offset: {}, real header size: {}",
178
0
                    _offset, header_size, _end_offset, real_header_size);
179
0
        }
180
0
        header_size <<= 2;
181
0
    }
182
183
72
    if constexpr (OFFSET_INDEX == false) {
184
72
        if (is_header_v2()) {
185
4
            _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
186
67
        } else if constexpr (!IN_COLLECTION) {
187
67
            _end_row = _start_row + _cur_page_header.data_page_header.num_values;
188
67
        }
189
72
    }
190
191
    // Save header bytes for possible cache insertion later
192
72
    _header_buf.assign(page_header_buf, page_header_buf + real_header_size);
193
72
    _last_header_size = real_header_size;
194
72
    _page_statistics.parse_page_header_num++;
195
72
    _offset += real_header_size;
196
72
    _next_header_offset = _offset + _cur_page_header.compressed_page_size;
197
72
    _state = HEADER_PARSED;
198
72
    return Status::OK();
199
72
}
Unexecuted instantiation: _ZN5doris10PageReaderILb1ELb1EE17parse_page_headerEv
_ZN5doris10PageReaderILb1ELb0EE17parse_page_headerEv
Line
Count
Source
78
2
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() {
79
2
    if (_state == HEADER_PARSED) {
80
0
        return Status::OK();
81
0
    }
82
2
    if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) {
83
0
        return Status::IOError("Out-of-bounds Access");
84
0
    }
85
2
    if (UNLIKELY(_offset != _next_header_offset)) {
86
0
        return Status::IOError("Wrong header position, should seek to a page header first");
87
0
    }
88
2
    if (UNLIKELY(_state != INITIALIZED)) {
89
0
        return Status::IOError("Should skip or load current page to get next page");
90
0
    }
91
92
2
    _page_statistics.page_read_counter += 1;
93
94
    // Parse page header from file; header bytes are saved for possible cache insertion
95
2
    const uint8_t* page_header_buf = nullptr;
96
2
    size_t max_size = _end_offset - _offset;
97
2
    size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size);
98
2
    const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20;
99
2
    uint32_t real_header_size = 0;
100
101
    // Try a header-only lookup in the page cache. Cached pages store
102
    // header + optional v2 levels + uncompressed payload, so we can
103
    // parse the page header directly from the cached bytes and avoid
104
    // a file read for the header.
105
2
    if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache &&
106
2
        StoragePageCache::instance() != nullptr) {
107
2
        PageCacheHandle handle;
108
2
        StoragePageCache::CacheKey key = make_page_cache_key(static_cast<int64_t>(_offset));
109
2
        if (StoragePageCache::instance()->lookup(key, &handle, segment_v2::DATA_PAGE)) {
110
            // Parse header directly from cached data
111
1
            _page_cache_handle = std::move(handle);
112
1
            Slice s = _page_cache_handle.data();
113
1
            real_header_size = cast_set<uint32_t>(s.size);
114
1
            SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
115
1
            auto st = deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(s.data),
116
1
                                             &real_header_size, true, &_cur_page_header);
117
1
            if (!st.ok()) return st;
118
            // Increment page cache counters for a true cache hit on header+payload
119
1
            _page_statistics.page_cache_hit_counter += 1;
120
            // Detect whether the cached payload is compressed or decompressed and record
121
1
            bool is_cache_payload_decompressed =
122
1
                    should_cache_decompressed(&_cur_page_header, _metadata);
123
124
1
            if (is_cache_payload_decompressed) {
125
1
                _page_statistics.page_cache_decompressed_hit_counter += 1;
126
1
            } else {
127
0
                _page_statistics.page_cache_compressed_hit_counter += 1;
128
0
            }
129
130
1
            _is_cache_payload_decompressed = is_cache_payload_decompressed;
131
132
1
            if constexpr (OFFSET_INDEX == false) {
133
1
                if (is_header_v2()) {
134
0
                    _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
135
1
                } else if constexpr (!IN_COLLECTION) {
136
1
                    _end_row = _start_row + _cur_page_header.data_page_header.num_values;
137
1
                }
138
1
            }
139
140
            // Save header bytes for later use (e.g., to insert updated cache entries)
141
1
            _header_buf.assign(s.data, s.data + real_header_size);
142
1
            _last_header_size = real_header_size;
143
1
            _page_statistics.parse_page_header_num++;
144
1
            _offset += real_header_size;
145
1
            _next_header_offset = _offset + _cur_page_header.compressed_page_size;
146
1
            _state = HEADER_PARSED;
147
1
            return Status::OK();
148
1
        } else {
149
1
            _page_statistics.page_cache_missing_counter += 1;
150
            // Clear any existing cache handle on miss to avoid holding stale handle
151
1
            _page_cache_handle = PageCacheHandle();
152
1
        }
153
2
    }
154
    // NOTE: page cache lookup for *decompressed* page data is handled in
155
    // ColumnChunkReader::load_page_data(). PageReader should only be
156
    // responsible for parsing the header bytes from the file and saving
157
    // them in `_header_buf` for possible later insertion into the cache.
158
1
    while (true) {
159
1
        if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
160
0
            return Status::EndOfFile("stop");
161
0
        }
162
1
        header_size = std::min(header_size, max_size);
163
1
        {
164
1
            SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time);
165
1
            RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx));
166
1
        }
167
1
        real_header_size = cast_set<uint32_t>(header_size);
168
1
        SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
169
1
        auto st =
170
1
                deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header);
171
1
        if (st.ok()) {
172
1
            break;
173
1
        }
174
0
        if (_offset + header_size >= _end_offset || real_header_size > MAX_PAGE_HEADER_SIZE) {
175
0
            return Status::IOError(
176
0
                    "Failed to deserialize parquet page header. offset: {}, "
177
0
                    "header size: {}, end offset: {}, real header size: {}",
178
0
                    _offset, header_size, _end_offset, real_header_size);
179
0
        }
180
0
        header_size <<= 2;
181
0
    }
182
183
1
    if constexpr (OFFSET_INDEX == false) {
184
1
        if (is_header_v2()) {
185
0
            _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
186
1
        } else if constexpr (!IN_COLLECTION) {
187
1
            _end_row = _start_row + _cur_page_header.data_page_header.num_values;
188
1
        }
189
1
    }
190
191
    // Save header bytes for possible cache insertion later
192
1
    _header_buf.assign(page_header_buf, page_header_buf + real_header_size);
193
1
    _last_header_size = real_header_size;
194
1
    _page_statistics.parse_page_header_num++;
195
1
    _offset += real_header_size;
196
1
    _next_header_offset = _offset + _cur_page_header.compressed_page_size;
197
1
    _state = HEADER_PARSED;
198
1
    return Status::OK();
199
1
}
Unexecuted instantiation: _ZN5doris10PageReaderILb0ELb1EE17parse_page_headerEv
_ZN5doris10PageReaderILb0ELb0EE17parse_page_headerEv
Line
Count
Source
78
191
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() {
79
191
    if (_state == HEADER_PARSED) {
80
0
        return Status::OK();
81
0
    }
82
191
    if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) {
83
0
        return Status::IOError("Out-of-bounds Access");
84
0
    }
85
191
    if (UNLIKELY(_offset != _next_header_offset)) {
86
0
        return Status::IOError("Wrong header position, should seek to a page header first");
87
0
    }
88
191
    if (UNLIKELY(_state != INITIALIZED)) {
89
0
        return Status::IOError("Should skip or load current page to get next page");
90
0
    }
91
92
191
    _page_statistics.page_read_counter += 1;
93
94
    // Parse page header from file; header bytes are saved for possible cache insertion
95
191
    const uint8_t* page_header_buf = nullptr;
96
191
    size_t max_size = _end_offset - _offset;
97
191
    size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size);
98
191
    const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20;
99
191
    uint32_t real_header_size = 0;
100
101
    // Try a header-only lookup in the page cache. Cached pages store
102
    // header + optional v2 levels + uncompressed payload, so we can
103
    // parse the page header directly from the cached bytes and avoid
104
    // a file read for the header.
105
191
    if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache &&
106
191
        StoragePageCache::instance() != nullptr) {
107
191
        PageCacheHandle handle;
108
191
        StoragePageCache::CacheKey key = make_page_cache_key(static_cast<int64_t>(_offset));
109
191
        if (StoragePageCache::instance()->lookup(key, &handle, segment_v2::DATA_PAGE)) {
110
            // Parse header directly from cached data
111
120
            _page_cache_handle = std::move(handle);
112
120
            Slice s = _page_cache_handle.data();
113
120
            real_header_size = cast_set<uint32_t>(s.size);
114
120
            SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
115
120
            auto st = deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(s.data),
116
120
                                             &real_header_size, true, &_cur_page_header);
117
120
            if (!st.ok()) return st;
118
            // Increment page cache counters for a true cache hit on header+payload
119
120
            _page_statistics.page_cache_hit_counter += 1;
120
            // Detect whether the cached payload is compressed or decompressed and record
121
120
            bool is_cache_payload_decompressed =
122
120
                    should_cache_decompressed(&_cur_page_header, _metadata);
123
124
120
            if (is_cache_payload_decompressed) {
125
120
                _page_statistics.page_cache_decompressed_hit_counter += 1;
126
120
            } else {
127
0
                _page_statistics.page_cache_compressed_hit_counter += 1;
128
0
            }
129
130
120
            _is_cache_payload_decompressed = is_cache_payload_decompressed;
131
132
120
            if constexpr (OFFSET_INDEX == false) {
133
120
                if (is_header_v2()) {
134
8
                    _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
135
112
                } else if constexpr (!IN_COLLECTION) {
136
112
                    _end_row = _start_row + _cur_page_header.data_page_header.num_values;
137
112
                }
138
120
            }
139
140
            // Save header bytes for later use (e.g., to insert updated cache entries)
141
120
            _header_buf.assign(s.data, s.data + real_header_size);
142
120
            _last_header_size = real_header_size;
143
120
            _page_statistics.parse_page_header_num++;
144
120
            _offset += real_header_size;
145
120
            _next_header_offset = _offset + _cur_page_header.compressed_page_size;
146
120
            _state = HEADER_PARSED;
147
120
            return Status::OK();
148
120
        } else {
149
71
            _page_statistics.page_cache_missing_counter += 1;
150
            // Clear any existing cache handle on miss to avoid holding stale handle
151
71
            _page_cache_handle = PageCacheHandle();
152
71
        }
153
191
    }
154
    // NOTE: page cache lookup for *decompressed* page data is handled in
155
    // ColumnChunkReader::load_page_data(). PageReader should only be
156
    // responsible for parsing the header bytes from the file and saving
157
    // them in `_header_buf` for possible later insertion into the cache.
158
71
    while (true) {
159
71
        if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
160
0
            return Status::EndOfFile("stop");
161
0
        }
162
71
        header_size = std::min(header_size, max_size);
163
71
        {
164
71
            SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time);
165
71
            RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx));
166
71
        }
167
71
        real_header_size = cast_set<uint32_t>(header_size);
168
71
        SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
169
71
        auto st =
170
71
                deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header);
171
71
        if (st.ok()) {
172
71
            break;
173
71
        }
174
0
        if (_offset + header_size >= _end_offset || real_header_size > MAX_PAGE_HEADER_SIZE) {
175
0
            return Status::IOError(
176
0
                    "Failed to deserialize parquet page header. offset: {}, "
177
0
                    "header size: {}, end offset: {}, real header size: {}",
178
0
                    _offset, header_size, _end_offset, real_header_size);
179
0
        }
180
0
        header_size <<= 2;
181
0
    }
182
183
71
    if constexpr (OFFSET_INDEX == false) {
184
71
        if (is_header_v2()) {
185
4
            _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
186
67
        } else if constexpr (!IN_COLLECTION) {
187
67
            _end_row = _start_row + _cur_page_header.data_page_header.num_values;
188
67
        }
189
71
    }
190
191
    // Save header bytes for possible cache insertion later
192
71
    _header_buf.assign(page_header_buf, page_header_buf + real_header_size);
193
71
    _last_header_size = real_header_size;
194
71
    _page_statistics.parse_page_header_num++;
195
71
    _offset += real_header_size;
196
71
    _next_header_offset = _offset + _cur_page_header.compressed_page_size;
197
71
    _state = HEADER_PARSED;
198
71
    return Status::OK();
199
71
}
200
201
template <bool IN_COLLECTION, bool OFFSET_INDEX>
202
71
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::get_page_data(Slice& slice) {
203
71
    if (UNLIKELY(_state != HEADER_PARSED)) {
204
0
        return Status::IOError("Should generate page header first to load current page data");
205
0
    }
206
71
    if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
207
0
        return Status::EndOfFile("stop");
208
0
    }
209
71
    slice.size = _cur_page_header.compressed_page_size;
210
71
    RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx));
211
71
    _offset += slice.size;
212
71
    _state = DATA_LOADED;
213
71
    return Status::OK();
214
71
}
Unexecuted instantiation: _ZN5doris10PageReaderILb1ELb1EE13get_page_dataERNS_5SliceE
_ZN5doris10PageReaderILb1ELb0EE13get_page_dataERNS_5SliceE
Line
Count
Source
202
1
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::get_page_data(Slice& slice) {
203
1
    if (UNLIKELY(_state != HEADER_PARSED)) {
204
0
        return Status::IOError("Should generate page header first to load current page data");
205
0
    }
206
1
    if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
207
0
        return Status::EndOfFile("stop");
208
0
    }
209
1
    slice.size = _cur_page_header.compressed_page_size;
210
1
    RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx));
211
1
    _offset += slice.size;
212
1
    _state = DATA_LOADED;
213
1
    return Status::OK();
214
1
}
Unexecuted instantiation: _ZN5doris10PageReaderILb0ELb1EE13get_page_dataERNS_5SliceE
_ZN5doris10PageReaderILb0ELb0EE13get_page_dataERNS_5SliceE
Line
Count
Source
202
70
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::get_page_data(Slice& slice) {
203
70
    if (UNLIKELY(_state != HEADER_PARSED)) {
204
0
        return Status::IOError("Should generate page header first to load current page data");
205
0
    }
206
70
    if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
207
0
        return Status::EndOfFile("stop");
208
0
    }
209
70
    slice.size = _cur_page_header.compressed_page_size;
210
70
    RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx));
211
70
    _offset += slice.size;
212
70
    _state = DATA_LOADED;
213
70
    return Status::OK();
214
70
}
215
216
template class PageReader<true, true>;
217
template class PageReader<true, false>;
218
template class PageReader<false, true>;
219
template class PageReader<false, false>;
220
#include "common/compile_check_end.h"
221
222
} // namespace doris