Coverage Report

Created: 2026-03-16 14:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/vparquet_page_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/parquet/vparquet_page_reader.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/parquet_types.h>
22
#include <stddef.h>
23
#include <stdint.h>
24
25
#include <algorithm>
26
27
#include "common/compiler_util.h" // IWYU pragma: keep
28
#include "common/config.h"
29
#include "format/parquet/parquet_common.h"
30
#include "io/fs/buffered_reader.h"
31
#include "runtime/runtime_profile.h"
32
#include "storage/cache/page_cache.h"
33
#include "util/slice.h"
34
#include "util/thrift_util.h"
35
36
namespace doris {
37
namespace io {
38
struct IOContext;
39
} // namespace io
40
} // namespace doris
41
42
namespace doris {
43
#include "common/compile_check_begin.h"
44
static constexpr size_t INIT_PAGE_HEADER_SIZE = 128;
45
46
163
void ParquetPageCacheKeyBuilder::init(const std::string& path, int64_t mtime) {
47
163
    _file_key_prefix = fmt::format("{}::{}", path, mtime);
48
163
}
49
50
template <bool IN_COLLECTION, bool OFFSET_INDEX>
51
PageReader<IN_COLLECTION, OFFSET_INDEX>::PageReader(io::BufferedStreamReader* reader,
52
                                                    io::IOContext* io_ctx, uint64_t offset,
53
                                                    uint64_t length, size_t total_rows,
54
                                                    const tparquet::ColumnMetaData& metadata,
55
                                                    const ParquetPageReadContext& page_read_ctx,
56
                                                    const tparquet::OffsetIndex* offset_index)
57
163
        : _reader(reader),
58
163
          _io_ctx(io_ctx),
59
163
          _offset(offset),
60
163
          _start_offset(offset),
61
163
          _end_offset(offset + length),
62
163
          _total_rows(total_rows),
63
163
          _metadata(metadata),
64
163
          _page_read_ctx(page_read_ctx),
65
163
          _offset_index(offset_index) {
66
163
    _next_header_offset = _offset;
67
163
    _state = INITIALIZED;
68
163
    _page_cache_key_builder.init(_reader->path(), _reader->mtime());
69
70
163
    if constexpr (OFFSET_INDEX) {
71
0
        _end_row = _offset_index->page_locations.size() >= 2
72
0
                           ? _offset_index->page_locations[1].first_row_index
73
0
                           : _total_rows;
74
0
    }
75
163
}
Unexecuted instantiation: _ZN5doris10PageReaderILb1ELb1EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE
_ZN5doris10PageReaderILb1ELb0EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE
Line
Count
Source
57
2
        : _reader(reader),
58
2
          _io_ctx(io_ctx),
59
2
          _offset(offset),
60
2
          _start_offset(offset),
61
2
          _end_offset(offset + length),
62
2
          _total_rows(total_rows),
63
2
          _metadata(metadata),
64
2
          _page_read_ctx(page_read_ctx),
65
2
          _offset_index(offset_index) {
66
2
    _next_header_offset = _offset;
67
2
    _state = INITIALIZED;
68
2
    _page_cache_key_builder.init(_reader->path(), _reader->mtime());
69
70
    if constexpr (OFFSET_INDEX) {
71
        _end_row = _offset_index->page_locations.size() >= 2
72
                           ? _offset_index->page_locations[1].first_row_index
73
                           : _total_rows;
74
    }
75
2
}
Unexecuted instantiation: _ZN5doris10PageReaderILb0ELb1EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE
_ZN5doris10PageReaderILb0ELb0EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE
Line
Count
Source
57
161
        : _reader(reader),
58
161
          _io_ctx(io_ctx),
59
161
          _offset(offset),
60
161
          _start_offset(offset),
61
161
          _end_offset(offset + length),
62
161
          _total_rows(total_rows),
63
161
          _metadata(metadata),
64
161
          _page_read_ctx(page_read_ctx),
65
161
          _offset_index(offset_index) {
66
161
    _next_header_offset = _offset;
67
161
    _state = INITIALIZED;
68
161
    _page_cache_key_builder.init(_reader->path(), _reader->mtime());
69
70
    if constexpr (OFFSET_INDEX) {
71
        _end_row = _offset_index->page_locations.size() >= 2
72
                           ? _offset_index->page_locations[1].first_row_index
73
                           : _total_rows;
74
    }
75
161
}
76
77
template <bool IN_COLLECTION, bool OFFSET_INDEX>
78
189
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() {
79
189
    if (_state == HEADER_PARSED) {
80
0
        return Status::OK();
81
0
    }
82
189
    if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) {
83
0
        return Status::IOError("Out-of-bounds Access");
84
0
    }
85
189
    if (UNLIKELY(_offset != _next_header_offset)) {
86
0
        return Status::IOError("Wrong header position, should seek to a page header first");
87
0
    }
88
189
    if (UNLIKELY(_state != INITIALIZED)) {
89
0
        return Status::IOError("Should skip or load current page to get next page");
90
0
    }
91
92
189
    _page_statistics.page_read_counter += 1;
93
94
    // Parse page header from file; header bytes are saved for possible cache insertion
95
189
    const uint8_t* page_header_buf = nullptr;
96
189
    size_t max_size = _end_offset - _offset;
97
189
    size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size);
98
189
    const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20;
99
189
    uint32_t real_header_size = 0;
100
101
    // Try a header-only lookup in the page cache. Cached pages store
102
    // header + optional v2 levels + uncompressed payload, so we can
103
    // parse the page header directly from the cached bytes and avoid
104
    // a file read for the header.
105
189
    if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache &&
106
189
        StoragePageCache::instance() != nullptr) {
107
189
        PageCacheHandle handle;
108
189
        StoragePageCache::CacheKey key = make_page_cache_key(static_cast<int64_t>(_offset));
109
189
        if (StoragePageCache::instance()->lookup(key, &handle, segment_v2::DATA_PAGE)) {
110
            // Parse header directly from cached data
111
121
            _page_cache_handle = std::move(handle);
112
121
            Slice s = _page_cache_handle.data();
113
121
            real_header_size = cast_set<uint32_t>(s.size);
114
121
            SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
115
121
            auto st = deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(s.data),
116
121
                                             &real_header_size, true, &_cur_page_header);
117
121
            if (!st.ok()) return st;
118
            // Increment page cache counters for a true cache hit on header+payload
119
121
            _page_statistics.page_cache_hit_counter += 1;
120
            // Detect whether the cached payload is compressed or decompressed and record
121
121
            bool is_cache_payload_decompressed =
122
121
                    should_cache_decompressed(&_cur_page_header, _metadata);
123
124
121
            if (is_cache_payload_decompressed) {
125
121
                _page_statistics.page_cache_decompressed_hit_counter += 1;
126
121
            } else {
127
0
                _page_statistics.page_cache_compressed_hit_counter += 1;
128
0
            }
129
130
121
            _is_cache_payload_decompressed = is_cache_payload_decompressed;
131
132
121
            if constexpr (OFFSET_INDEX == false) {
133
121
                if (is_header_v2()) {
134
8
                    _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
135
112
                } else if constexpr (!IN_COLLECTION) {
136
112
                    _end_row = _start_row + _cur_page_header.data_page_header.num_values;
137
112
                }
138
121
            }
139
140
            // Save header bytes for later use (e.g., to insert updated cache entries)
141
121
            _header_buf.assign(s.data, s.data + real_header_size);
142
121
            _last_header_size = real_header_size;
143
121
            _page_statistics.parse_page_header_num++;
144
121
            _offset += real_header_size;
145
121
            _next_header_offset = _offset + _cur_page_header.compressed_page_size;
146
121
            _state = HEADER_PARSED;
147
121
            return Status::OK();
148
121
        } else {
149
68
            _page_statistics.page_cache_missing_counter += 1;
150
            // Clear any existing cache handle on miss to avoid holding stale handle
151
68
            _page_cache_handle = PageCacheHandle();
152
68
        }
153
189
    }
154
    // NOTE: page cache lookup for *decompressed* page data is handled in
155
    // ColumnChunkReader::load_page_data(). PageReader should only be
156
    // responsible for parsing the header bytes from the file and saving
157
    // them in `_header_buf` for possible later insertion into the cache.
158
68
    while (true) {
159
68
        if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
160
0
            return Status::EndOfFile("stop");
161
0
        }
162
68
        header_size = std::min(header_size, max_size);
163
68
        {
164
68
            SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time);
165
68
            RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx));
166
68
        }
167
68
        real_header_size = cast_set<uint32_t>(header_size);
168
68
        SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
169
68
        auto st =
170
68
                deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header);
171
68
        if (st.ok()) {
172
68
            break;
173
68
        }
174
0
        if (_offset + header_size >= _end_offset || real_header_size > MAX_PAGE_HEADER_SIZE) {
175
0
            return Status::IOError(
176
0
                    "Failed to deserialize parquet page header. offset: {}, "
177
0
                    "header size: {}, end offset: {}, real header size: {}",
178
0
                    _offset, header_size, _end_offset, real_header_size);
179
0
        }
180
0
        header_size <<= 2;
181
0
    }
182
183
68
    if constexpr (OFFSET_INDEX == false) {
184
68
        if (is_header_v2()) {
185
2
            _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
186
65
        } else if constexpr (!IN_COLLECTION) {
187
65
            _end_row = _start_row + _cur_page_header.data_page_header.num_values;
188
65
        }
189
68
    }
190
191
    // Save header bytes for possible cache insertion later
192
68
    _header_buf.assign(page_header_buf, page_header_buf + real_header_size);
193
68
    _last_header_size = real_header_size;
194
68
    _page_statistics.parse_page_header_num++;
195
68
    _offset += real_header_size;
196
68
    _next_header_offset = _offset + _cur_page_header.compressed_page_size;
197
68
    _state = HEADER_PARSED;
198
68
    return Status::OK();
199
68
}
Unexecuted instantiation: _ZN5doris10PageReaderILb1ELb1EE17parse_page_headerEv
_ZN5doris10PageReaderILb1ELb0EE17parse_page_headerEv
Line
Count
Source
78
2
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() {
79
2
    if (_state == HEADER_PARSED) {
80
0
        return Status::OK();
81
0
    }
82
2
    if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) {
83
0
        return Status::IOError("Out-of-bounds Access");
84
0
    }
85
2
    if (UNLIKELY(_offset != _next_header_offset)) {
86
0
        return Status::IOError("Wrong header position, should seek to a page header first");
87
0
    }
88
2
    if (UNLIKELY(_state != INITIALIZED)) {
89
0
        return Status::IOError("Should skip or load current page to get next page");
90
0
    }
91
92
2
    _page_statistics.page_read_counter += 1;
93
94
    // Parse page header from file; header bytes are saved for possible cache insertion
95
2
    const uint8_t* page_header_buf = nullptr;
96
2
    size_t max_size = _end_offset - _offset;
97
2
    size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size);
98
2
    const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20;
99
2
    uint32_t real_header_size = 0;
100
101
    // Try a header-only lookup in the page cache. Cached pages store
102
    // header + optional v2 levels + uncompressed payload, so we can
103
    // parse the page header directly from the cached bytes and avoid
104
    // a file read for the header.
105
2
    if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache &&
106
2
        StoragePageCache::instance() != nullptr) {
107
2
        PageCacheHandle handle;
108
2
        StoragePageCache::CacheKey key = make_page_cache_key(static_cast<int64_t>(_offset));
109
2
        if (StoragePageCache::instance()->lookup(key, &handle, segment_v2::DATA_PAGE)) {
110
            // Parse header directly from cached data
111
1
            _page_cache_handle = std::move(handle);
112
1
            Slice s = _page_cache_handle.data();
113
1
            real_header_size = cast_set<uint32_t>(s.size);
114
1
            SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
115
1
            auto st = deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(s.data),
116
1
                                             &real_header_size, true, &_cur_page_header);
117
1
            if (!st.ok()) return st;
118
            // Increment page cache counters for a true cache hit on header+payload
119
1
            _page_statistics.page_cache_hit_counter += 1;
120
            // Detect whether the cached payload is compressed or decompressed and record
121
1
            bool is_cache_payload_decompressed =
122
1
                    should_cache_decompressed(&_cur_page_header, _metadata);
123
124
1
            if (is_cache_payload_decompressed) {
125
1
                _page_statistics.page_cache_decompressed_hit_counter += 1;
126
1
            } else {
127
0
                _page_statistics.page_cache_compressed_hit_counter += 1;
128
0
            }
129
130
1
            _is_cache_payload_decompressed = is_cache_payload_decompressed;
131
132
1
            if constexpr (OFFSET_INDEX == false) {
133
1
                if (is_header_v2()) {
134
0
                    _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
135
1
                } else if constexpr (!IN_COLLECTION) {
136
1
                    _end_row = _start_row + _cur_page_header.data_page_header.num_values;
137
1
                }
138
1
            }
139
140
            // Save header bytes for later use (e.g., to insert updated cache entries)
141
1
            _header_buf.assign(s.data, s.data + real_header_size);
142
1
            _last_header_size = real_header_size;
143
1
            _page_statistics.parse_page_header_num++;
144
1
            _offset += real_header_size;
145
1
            _next_header_offset = _offset + _cur_page_header.compressed_page_size;
146
1
            _state = HEADER_PARSED;
147
1
            return Status::OK();
148
1
        } else {
149
1
            _page_statistics.page_cache_missing_counter += 1;
150
            // Clear any existing cache handle on miss to avoid holding stale handle
151
1
            _page_cache_handle = PageCacheHandle();
152
1
        }
153
2
    }
154
    // NOTE: page cache lookup for *decompressed* page data is handled in
155
    // ColumnChunkReader::load_page_data(). PageReader should only be
156
    // responsible for parsing the header bytes from the file and saving
157
    // them in `_header_buf` for possible later insertion into the cache.
158
1
    while (true) {
159
1
        if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
160
0
            return Status::EndOfFile("stop");
161
0
        }
162
1
        header_size = std::min(header_size, max_size);
163
1
        {
164
1
            SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time);
165
1
            RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx));
166
1
        }
167
1
        real_header_size = cast_set<uint32_t>(header_size);
168
1
        SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
169
1
        auto st =
170
1
                deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header);
171
1
        if (st.ok()) {
172
1
            break;
173
1
        }
174
0
        if (_offset + header_size >= _end_offset || real_header_size > MAX_PAGE_HEADER_SIZE) {
175
0
            return Status::IOError(
176
0
                    "Failed to deserialize parquet page header. offset: {}, "
177
0
                    "header size: {}, end offset: {}, real header size: {}",
178
0
                    _offset, header_size, _end_offset, real_header_size);
179
0
        }
180
0
        header_size <<= 2;
181
0
    }
182
183
1
    if constexpr (OFFSET_INDEX == false) {
184
1
        if (is_header_v2()) {
185
0
            _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
186
1
        } else if constexpr (!IN_COLLECTION) {
187
1
            _end_row = _start_row + _cur_page_header.data_page_header.num_values;
188
1
        }
189
1
    }
190
191
    // Save header bytes for possible cache insertion later
192
1
    _header_buf.assign(page_header_buf, page_header_buf + real_header_size);
193
1
    _last_header_size = real_header_size;
194
1
    _page_statistics.parse_page_header_num++;
195
1
    _offset += real_header_size;
196
1
    _next_header_offset = _offset + _cur_page_header.compressed_page_size;
197
1
    _state = HEADER_PARSED;
198
1
    return Status::OK();
199
1
}
Unexecuted instantiation: _ZN5doris10PageReaderILb0ELb1EE17parse_page_headerEv
_ZN5doris10PageReaderILb0ELb0EE17parse_page_headerEv
Line
Count
Source
78
187
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() {
79
187
    if (_state == HEADER_PARSED) {
80
0
        return Status::OK();
81
0
    }
82
187
    if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) {
83
0
        return Status::IOError("Out-of-bounds Access");
84
0
    }
85
187
    if (UNLIKELY(_offset != _next_header_offset)) {
86
0
        return Status::IOError("Wrong header position, should seek to a page header first");
87
0
    }
88
187
    if (UNLIKELY(_state != INITIALIZED)) {
89
0
        return Status::IOError("Should skip or load current page to get next page");
90
0
    }
91
92
187
    _page_statistics.page_read_counter += 1;
93
94
    // Parse page header from file; header bytes are saved for possible cache insertion
95
187
    const uint8_t* page_header_buf = nullptr;
96
187
    size_t max_size = _end_offset - _offset;
97
187
    size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size);
98
187
    const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20;
99
187
    uint32_t real_header_size = 0;
100
101
    // Try a header-only lookup in the page cache. Cached pages store
102
    // header + optional v2 levels + uncompressed payload, so we can
103
    // parse the page header directly from the cached bytes and avoid
104
    // a file read for the header.
105
187
    if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache &&
106
187
        StoragePageCache::instance() != nullptr) {
107
187
        PageCacheHandle handle;
108
187
        StoragePageCache::CacheKey key = make_page_cache_key(static_cast<int64_t>(_offset));
109
187
        if (StoragePageCache::instance()->lookup(key, &handle, segment_v2::DATA_PAGE)) {
110
            // Parse header directly from cached data
111
120
            _page_cache_handle = std::move(handle);
112
120
            Slice s = _page_cache_handle.data();
113
120
            real_header_size = cast_set<uint32_t>(s.size);
114
120
            SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
115
120
            auto st = deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(s.data),
116
120
                                             &real_header_size, true, &_cur_page_header);
117
120
            if (!st.ok()) return st;
118
            // Increment page cache counters for a true cache hit on header+payload
119
120
            _page_statistics.page_cache_hit_counter += 1;
120
            // Detect whether the cached payload is compressed or decompressed and record
121
120
            bool is_cache_payload_decompressed =
122
120
                    should_cache_decompressed(&_cur_page_header, _metadata);
123
124
120
            if (is_cache_payload_decompressed) {
125
120
                _page_statistics.page_cache_decompressed_hit_counter += 1;
126
120
            } else {
127
0
                _page_statistics.page_cache_compressed_hit_counter += 1;
128
0
            }
129
130
120
            _is_cache_payload_decompressed = is_cache_payload_decompressed;
131
132
120
            if constexpr (OFFSET_INDEX == false) {
133
120
                if (is_header_v2()) {
134
8
                    _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
135
112
                } else if constexpr (!IN_COLLECTION) {
136
112
                    _end_row = _start_row + _cur_page_header.data_page_header.num_values;
137
112
                }
138
120
            }
139
140
            // Save header bytes for later use (e.g., to insert updated cache entries)
141
120
            _header_buf.assign(s.data, s.data + real_header_size);
142
120
            _last_header_size = real_header_size;
143
120
            _page_statistics.parse_page_header_num++;
144
120
            _offset += real_header_size;
145
120
            _next_header_offset = _offset + _cur_page_header.compressed_page_size;
146
120
            _state = HEADER_PARSED;
147
120
            return Status::OK();
148
120
        } else {
149
67
            _page_statistics.page_cache_missing_counter += 1;
150
            // Clear any existing cache handle on miss to avoid holding stale handle
151
67
            _page_cache_handle = PageCacheHandle();
152
67
        }
153
187
    }
154
    // NOTE: page cache lookup for *decompressed* page data is handled in
155
    // ColumnChunkReader::load_page_data(). PageReader should only be
156
    // responsible for parsing the header bytes from the file and saving
157
    // them in `_header_buf` for possible later insertion into the cache.
158
67
    while (true) {
159
67
        if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
160
0
            return Status::EndOfFile("stop");
161
0
        }
162
67
        header_size = std::min(header_size, max_size);
163
67
        {
164
67
            SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time);
165
67
            RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx));
166
67
        }
167
67
        real_header_size = cast_set<uint32_t>(header_size);
168
67
        SCOPED_RAW_TIMER(&_page_statistics.decode_header_time);
169
67
        auto st =
170
67
                deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header);
171
67
        if (st.ok()) {
172
67
            break;
173
67
        }
174
0
        if (_offset + header_size >= _end_offset || real_header_size > MAX_PAGE_HEADER_SIZE) {
175
0
            return Status::IOError(
176
0
                    "Failed to deserialize parquet page header. offset: {}, "
177
0
                    "header size: {}, end offset: {}, real header size: {}",
178
0
                    _offset, header_size, _end_offset, real_header_size);
179
0
        }
180
0
        header_size <<= 2;
181
0
    }
182
183
67
    if constexpr (OFFSET_INDEX == false) {
184
67
        if (is_header_v2()) {
185
2
            _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows;
186
65
        } else if constexpr (!IN_COLLECTION) {
187
65
            _end_row = _start_row + _cur_page_header.data_page_header.num_values;
188
65
        }
189
67
    }
190
191
    // Save header bytes for possible cache insertion later
192
67
    _header_buf.assign(page_header_buf, page_header_buf + real_header_size);
193
67
    _last_header_size = real_header_size;
194
67
    _page_statistics.parse_page_header_num++;
195
67
    _offset += real_header_size;
196
67
    _next_header_offset = _offset + _cur_page_header.compressed_page_size;
197
67
    _state = HEADER_PARSED;
198
67
    return Status::OK();
199
67
}
200
201
template <bool IN_COLLECTION, bool OFFSET_INDEX>
202
67
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::get_page_data(Slice& slice) {
203
67
    if (UNLIKELY(_state != HEADER_PARSED)) {
204
0
        return Status::IOError("Should generate page header first to load current page data");
205
0
    }
206
67
    if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
207
0
        return Status::EndOfFile("stop");
208
0
    }
209
67
    slice.size = _cur_page_header.compressed_page_size;
210
67
    RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx));
211
67
    _offset += slice.size;
212
67
    _state = DATA_LOADED;
213
67
    return Status::OK();
214
67
}
Unexecuted instantiation: _ZN5doris10PageReaderILb1ELb1EE13get_page_dataERNS_5SliceE
_ZN5doris10PageReaderILb1ELb0EE13get_page_dataERNS_5SliceE
Line
Count
Source
202
1
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::get_page_data(Slice& slice) {
203
1
    if (UNLIKELY(_state != HEADER_PARSED)) {
204
0
        return Status::IOError("Should generate page header first to load current page data");
205
0
    }
206
1
    if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
207
0
        return Status::EndOfFile("stop");
208
0
    }
209
1
    slice.size = _cur_page_header.compressed_page_size;
210
1
    RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx));
211
1
    _offset += slice.size;
212
1
    _state = DATA_LOADED;
213
1
    return Status::OK();
214
1
}
Unexecuted instantiation: _ZN5doris10PageReaderILb0ELb1EE13get_page_dataERNS_5SliceE
_ZN5doris10PageReaderILb0ELb0EE13get_page_dataERNS_5SliceE
Line
Count
Source
202
66
Status PageReader<IN_COLLECTION, OFFSET_INDEX>::get_page_data(Slice& slice) {
203
66
    if (UNLIKELY(_state != HEADER_PARSED)) {
204
0
        return Status::IOError("Should generate page header first to load current page data");
205
0
    }
206
66
    if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
207
0
        return Status::EndOfFile("stop");
208
0
    }
209
66
    slice.size = _cur_page_header.compressed_page_size;
210
66
    RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx));
211
66
    _offset += slice.size;
212
66
    _state = DATA_LOADED;
213
66
    return Status::OK();
214
66
}
215
216
template class PageReader<true, true>;
217
template class PageReader<true, false>;
218
template class PageReader<false, true>;
219
template class PageReader<false, false>;
220
#include "common/compile_check_end.h"
221
222
} // namespace doris