be/src/format/parquet/vparquet_page_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format/parquet/vparquet_page_reader.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/parquet_types.h> |
22 | | #include <stddef.h> |
23 | | #include <stdint.h> |
24 | | |
25 | | #include <algorithm> |
26 | | |
27 | | #include "common/compiler_util.h" // IWYU pragma: keep |
28 | | #include "common/config.h" |
29 | | #include "format/parquet/parquet_common.h" |
30 | | #include "io/fs/buffered_reader.h" |
31 | | #include "runtime/runtime_profile.h" |
32 | | #include "storage/cache/page_cache.h" |
33 | | #include "util/slice.h" |
34 | | #include "util/thrift_util.h" |
35 | | |
36 | | namespace doris { |
37 | | namespace io { |
38 | | struct IOContext; |
39 | | } // namespace io |
40 | | } // namespace doris |
41 | | |
42 | | namespace doris { |
43 | | static constexpr size_t INIT_PAGE_HEADER_SIZE = 128; |
44 | | |
45 | 167 | void ParquetPageCacheKeyBuilder::init(const std::string& path, int64_t mtime) { |
46 | 167 | _file_key_prefix = fmt::format("{}::{}", path, mtime); |
47 | 167 | } |
48 | | |
49 | | template <bool IN_COLLECTION, bool OFFSET_INDEX> |
50 | | PageReader<IN_COLLECTION, OFFSET_INDEX>::PageReader(io::BufferedStreamReader* reader, |
51 | | io::IOContext* io_ctx, uint64_t offset, |
52 | | uint64_t length, size_t total_rows, |
53 | | const tparquet::ColumnMetaData& metadata, |
54 | | const ParquetPageReadContext& page_read_ctx, |
55 | | const tparquet::OffsetIndex* offset_index) |
56 | 167 | : _reader(reader), |
57 | 167 | _io_ctx(io_ctx), |
58 | 167 | _offset(offset), |
59 | 167 | _start_offset(offset), |
60 | 167 | _end_offset(offset + length), |
61 | 167 | _total_rows(total_rows), |
62 | 167 | _metadata(metadata), |
63 | 167 | _page_read_ctx(page_read_ctx), |
64 | 167 | _offset_index(offset_index) { |
65 | 167 | _next_header_offset = _offset; |
66 | 167 | _state = INITIALIZED; |
67 | 167 | _page_cache_key_builder.init(_reader->path(), _reader->mtime()); |
68 | | |
69 | 167 | if constexpr (OFFSET_INDEX) { |
70 | 0 | _end_row = _offset_index->page_locations.size() >= 2 |
71 | 0 | ? _offset_index->page_locations[1].first_row_index |
72 | 0 | : _total_rows; |
73 | 0 | } |
74 | 167 | } Unexecuted instantiation: _ZN5doris10PageReaderILb1ELb1EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE _ZN5doris10PageReaderILb1ELb0EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE Line | Count | Source | 56 | 2 | : _reader(reader), | 57 | 2 | _io_ctx(io_ctx), | 58 | 2 | _offset(offset), | 59 | 2 | _start_offset(offset), | 60 | 2 | _end_offset(offset + length), | 61 | 2 | _total_rows(total_rows), | 62 | 2 | _metadata(metadata), | 63 | 2 | _page_read_ctx(page_read_ctx), | 64 | 2 | _offset_index(offset_index) { | 65 | 2 | _next_header_offset = _offset; | 66 | 2 | _state = INITIALIZED; | 67 | 2 | _page_cache_key_builder.init(_reader->path(), _reader->mtime()); | 68 | | | 69 | | if constexpr (OFFSET_INDEX) { | 70 | | _end_row = _offset_index->page_locations.size() >= 2 | 71 | | ? _offset_index->page_locations[1].first_row_index | 72 | | : _total_rows; | 73 | | } | 74 | 2 | } |
Unexecuted instantiation: _ZN5doris10PageReaderILb0ELb1EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE _ZN5doris10PageReaderILb0ELb0EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE Line | Count | Source | 56 | 165 | : _reader(reader), | 57 | 165 | _io_ctx(io_ctx), | 58 | 165 | _offset(offset), | 59 | 165 | _start_offset(offset), | 60 | 165 | _end_offset(offset + length), | 61 | 165 | _total_rows(total_rows), | 62 | 165 | _metadata(metadata), | 63 | 165 | _page_read_ctx(page_read_ctx), | 64 | 165 | _offset_index(offset_index) { | 65 | 165 | _next_header_offset = _offset; | 66 | 165 | _state = INITIALIZED; | 67 | 165 | _page_cache_key_builder.init(_reader->path(), _reader->mtime()); | 68 | | | 69 | | if constexpr (OFFSET_INDEX) { | 70 | | _end_row = _offset_index->page_locations.size() >= 2 | 71 | | ? _offset_index->page_locations[1].first_row_index | 72 | | : _total_rows; | 73 | | } | 74 | 165 | } |
|
75 | | |
76 | | template <bool IN_COLLECTION, bool OFFSET_INDEX> |
77 | 196 | Status PageReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() { |
78 | 196 | if (_state == HEADER_PARSED) { |
79 | 0 | return Status::OK(); |
80 | 0 | } |
81 | 196 | if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) { |
82 | 0 | return Status::IOError("Out-of-bounds Access"); |
83 | 0 | } |
84 | 196 | if (UNLIKELY(_offset != _next_header_offset)) { |
85 | 0 | return Status::IOError("Wrong header position, should seek to a page header first"); |
86 | 0 | } |
87 | 196 | if (UNLIKELY(_state != INITIALIZED)) { |
88 | 0 | return Status::IOError("Should skip or load current page to get next page"); |
89 | 0 | } |
90 | | |
91 | 196 | _page_statistics.page_read_counter += 1; |
92 | | |
93 | | // Parse page header from file; header bytes are saved for possible cache insertion |
94 | 196 | const uint8_t* page_header_buf = nullptr; |
95 | 196 | size_t max_size = _end_offset - _offset; |
96 | 196 | size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size); |
97 | 196 | const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20; |
98 | 196 | uint32_t real_header_size = 0; |
99 | | |
100 | | // Try a header-only lookup in the page cache. Cached pages store |
101 | | // header + optional v2 levels + uncompressed payload, so we can |
102 | | // parse the page header directly from the cached bytes and avoid |
103 | | // a file read for the header. |
104 | 196 | if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache && |
105 | 196 | StoragePageCache::instance() != nullptr) { |
106 | 196 | PageCacheHandle handle; |
107 | 196 | StoragePageCache::CacheKey key = make_page_cache_key(static_cast<int64_t>(_offset)); |
108 | 196 | if (StoragePageCache::instance()->lookup(key, &handle, segment_v2::DATA_PAGE)) { |
109 | | // Parse header directly from cached data |
110 | 121 | _page_cache_handle = std::move(handle); |
111 | 121 | Slice s = _page_cache_handle.data(); |
112 | 121 | real_header_size = cast_set<uint32_t>(s.size); |
113 | 121 | SCOPED_RAW_TIMER(&_page_statistics.decode_header_time); |
114 | 121 | auto st = deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(s.data), |
115 | 121 | &real_header_size, true, &_cur_page_header); |
116 | 121 | if (!st.ok()) return st; |
117 | | // Increment page cache counters for a true cache hit on header+payload |
118 | 121 | _page_statistics.page_cache_hit_counter += 1; |
119 | | // Detect whether the cached payload is compressed or decompressed and record |
120 | 121 | bool is_cache_payload_decompressed = |
121 | 121 | should_cache_decompressed(&_cur_page_header, _metadata); |
122 | | |
123 | 121 | if (is_cache_payload_decompressed) { |
124 | 121 | _page_statistics.page_cache_decompressed_hit_counter += 1; |
125 | 121 | } else { |
126 | 0 | _page_statistics.page_cache_compressed_hit_counter += 1; |
127 | 0 | } |
128 | | |
129 | 121 | _is_cache_payload_decompressed = is_cache_payload_decompressed; |
130 | | |
131 | 121 | if constexpr (OFFSET_INDEX == false) { |
132 | 121 | if (is_header_v2()) { |
133 | 8 | _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows; |
134 | 112 | } else if constexpr (!IN_COLLECTION) { |
135 | 112 | _end_row = _start_row + _cur_page_header.data_page_header.num_values; |
136 | 112 | } |
137 | 121 | } |
138 | | |
139 | | // Save header bytes for later use (e.g., to insert updated cache entries) |
140 | 121 | _header_buf.assign(s.data, s.data + real_header_size); |
141 | 121 | _last_header_size = real_header_size; |
142 | 121 | _page_statistics.parse_page_header_num++; |
143 | 121 | _offset += real_header_size; |
144 | 121 | _next_header_offset = _offset + _cur_page_header.compressed_page_size; |
145 | 121 | _state = HEADER_PARSED; |
146 | 121 | return Status::OK(); |
147 | 121 | } else { |
148 | 75 | _page_statistics.page_cache_missing_counter += 1; |
149 | | // Clear any existing cache handle on miss to avoid holding stale handle |
150 | 75 | _page_cache_handle = PageCacheHandle(); |
151 | 75 | } |
152 | 196 | } |
153 | | // NOTE: page cache lookup for *decompressed* page data is handled in |
154 | | // ColumnChunkReader::load_page_data(). PageReader should only be |
155 | | // responsible for parsing the header bytes from the file and saving |
156 | | // them in `_header_buf` for possible later insertion into the cache. |
157 | 76 | while (true) { |
158 | 76 | if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) { |
159 | 0 | return Status::EndOfFile("stop"); |
160 | 0 | } |
161 | 76 | header_size = std::min(header_size, max_size); |
162 | 76 | { |
163 | 76 | SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time); |
164 | 76 | RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx)); |
165 | 76 | } |
166 | 76 | real_header_size = cast_set<uint32_t>(header_size); |
167 | 76 | SCOPED_RAW_TIMER(&_page_statistics.decode_header_time); |
168 | 76 | auto st = |
169 | 76 | deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header); |
170 | 76 | if (st.ok()) { |
171 | 75 | break; |
172 | 75 | } |
173 | 1 | if (_offset + header_size >= _end_offset || real_header_size > MAX_PAGE_HEADER_SIZE) { |
174 | 0 | return Status::IOError( |
175 | 0 | "Failed to deserialize parquet page header. offset: {}, " |
176 | 0 | "header size: {}, end offset: {}, real header size: {}", |
177 | 0 | _offset, header_size, _end_offset, real_header_size); |
178 | 0 | } |
179 | 1 | header_size <<= 2; |
180 | 1 | } |
181 | | |
182 | 75 | if constexpr (OFFSET_INDEX == false) { |
183 | 75 | if (is_header_v2()) { |
184 | 4 | _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows; |
185 | 70 | } else if constexpr (!IN_COLLECTION) { |
186 | 70 | _end_row = _start_row + _cur_page_header.data_page_header.num_values; |
187 | 70 | } |
188 | 75 | } |
189 | | |
190 | | // Save header bytes for possible cache insertion later |
191 | 75 | _header_buf.assign(page_header_buf, page_header_buf + real_header_size); |
192 | 75 | _last_header_size = real_header_size; |
193 | 75 | _page_statistics.parse_page_header_num++; |
194 | 75 | _offset += real_header_size; |
195 | 75 | _next_header_offset = _offset + _cur_page_header.compressed_page_size; |
196 | 75 | _state = HEADER_PARSED; |
197 | 75 | return Status::OK(); |
198 | 75 | } Unexecuted instantiation: _ZN5doris10PageReaderILb1ELb1EE17parse_page_headerEv _ZN5doris10PageReaderILb1ELb0EE17parse_page_headerEv Line | Count | Source | 77 | 2 | Status PageReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() { | 78 | 2 | if (_state == HEADER_PARSED) { | 79 | 0 | return Status::OK(); | 80 | 0 | } | 81 | 2 | if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) { | 82 | 0 | return Status::IOError("Out-of-bounds Access"); | 83 | 0 | } | 84 | 2 | if (UNLIKELY(_offset != _next_header_offset)) { | 85 | 0 | return Status::IOError("Wrong header position, should seek to a page header first"); | 86 | 0 | } | 87 | 2 | if (UNLIKELY(_state != INITIALIZED)) { | 88 | 0 | return Status::IOError("Should skip or load current page to get next page"); | 89 | 0 | } | 90 | | | 91 | 2 | _page_statistics.page_read_counter += 1; | 92 | | | 93 | | // Parse page header from file; header bytes are saved for possible cache insertion | 94 | 2 | const uint8_t* page_header_buf = nullptr; | 95 | 2 | size_t max_size = _end_offset - _offset; | 96 | 2 | size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size); | 97 | 2 | const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20; | 98 | 2 | uint32_t real_header_size = 0; | 99 | | | 100 | | // Try a header-only lookup in the page cache. Cached pages store | 101 | | // header + optional v2 levels + uncompressed payload, so we can | 102 | | // parse the page header directly from the cached bytes and avoid | 103 | | // a file read for the header. | 104 | 2 | if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache && | 105 | 2 | StoragePageCache::instance() != nullptr) { | 106 | 2 | PageCacheHandle handle; | 107 | 2 | StoragePageCache::CacheKey key = make_page_cache_key(static_cast<int64_t>(_offset)); | 108 | 2 | if (StoragePageCache::instance()->lookup(key, &handle, segment_v2::DATA_PAGE)) { | 109 | | // Parse header directly from cached data | 110 | 1 | _page_cache_handle = std::move(handle); | 111 | 1 | Slice s = _page_cache_handle.data(); | 112 | 1 | real_header_size = cast_set<uint32_t>(s.size); | 113 | 1 | SCOPED_RAW_TIMER(&_page_statistics.decode_header_time); | 114 | 1 | auto st = deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(s.data), | 115 | 1 | &real_header_size, true, &_cur_page_header); | 116 | 1 | if (!st.ok()) return st; | 117 | | // Increment page cache counters for a true cache hit on header+payload | 118 | 1 | _page_statistics.page_cache_hit_counter += 1; | 119 | | // Detect whether the cached payload is compressed or decompressed and record | 120 | 1 | bool is_cache_payload_decompressed = | 121 | 1 | should_cache_decompressed(&_cur_page_header, _metadata); | 122 | | | 123 | 1 | if (is_cache_payload_decompressed) { | 124 | 1 | _page_statistics.page_cache_decompressed_hit_counter += 1; | 125 | 1 | } else { | 126 | 0 | _page_statistics.page_cache_compressed_hit_counter += 1; | 127 | 0 | } | 128 | | | 129 | 1 | _is_cache_payload_decompressed = is_cache_payload_decompressed; | 130 | | | 131 | 1 | if constexpr (OFFSET_INDEX == false) { | 132 | 1 | if (is_header_v2()) { | 133 | 0 | _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows; | 134 | 1 | } else if constexpr (!IN_COLLECTION) { | 135 | 1 | _end_row = _start_row + _cur_page_header.data_page_header.num_values; | 136 | 1 | } | 137 | 1 | } | 138 | | | 139 | | // Save header bytes for later use (e.g., to insert updated cache entries) | 140 | 1 | _header_buf.assign(s.data, s.data + real_header_size); | 141 | 1 | _last_header_size = real_header_size; | 142 | 1 | _page_statistics.parse_page_header_num++; | 143 | 1 | _offset += real_header_size; | 144 | 1 | _next_header_offset = _offset + _cur_page_header.compressed_page_size; | 145 | 1 | _state = HEADER_PARSED; | 146 | 1 | return Status::OK(); | 147 | 1 | } else { | 148 | 1 | _page_statistics.page_cache_missing_counter += 1; | 149 | | // Clear any existing cache handle on miss to avoid holding stale handle | 150 | 1 | _page_cache_handle = PageCacheHandle(); | 151 | 1 | } | 152 | 2 | } | 153 | | // NOTE: page cache lookup for *decompressed* page data is handled in | 154 | | // ColumnChunkReader::load_page_data(). PageReader should only be | 155 | | // responsible for parsing the header bytes from the file and saving | 156 | | // them in `_header_buf` for possible later insertion into the cache. | 157 | 1 | while (true) { | 158 | 1 | if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) { | 159 | 0 | return Status::EndOfFile("stop"); | 160 | 0 | } | 161 | 1 | header_size = std::min(header_size, max_size); | 162 | 1 | { | 163 | 1 | SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time); | 164 | 1 | RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx)); | 165 | 1 | } | 166 | 1 | real_header_size = cast_set<uint32_t>(header_size); | 167 | 1 | SCOPED_RAW_TIMER(&_page_statistics.decode_header_time); | 168 | 1 | auto st = | 169 | 1 | deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header); | 170 | 1 | if (st.ok()) { | 171 | 1 | break; | 172 | 1 | } | 173 | 0 | if (_offset + header_size >= _end_offset || real_header_size > MAX_PAGE_HEADER_SIZE) { | 174 | 0 | return Status::IOError( | 175 | 0 | "Failed to deserialize parquet page header. offset: {}, " | 176 | 0 | "header size: {}, end offset: {}, real header size: {}", | 177 | 0 | _offset, header_size, _end_offset, real_header_size); | 178 | 0 | } | 179 | 0 | header_size <<= 2; | 180 | 0 | } | 181 | | | 182 | 1 | if constexpr (OFFSET_INDEX == false) { | 183 | 1 | if (is_header_v2()) { | 184 | 0 | _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows; | 185 | 1 | } else if constexpr (!IN_COLLECTION) { | 186 | 1 | _end_row = _start_row + _cur_page_header.data_page_header.num_values; | 187 | 1 | } | 188 | 1 | } | 189 | | | 190 | | // Save header bytes for possible cache insertion later | 191 | 1 | _header_buf.assign(page_header_buf, page_header_buf + real_header_size); | 192 | 1 | _last_header_size = real_header_size; | 193 | 1 | _page_statistics.parse_page_header_num++; | 194 | 1 | _offset += real_header_size; | 195 | 1 | _next_header_offset = _offset + _cur_page_header.compressed_page_size; | 196 | 1 | _state = HEADER_PARSED; | 197 | 1 | return Status::OK(); | 198 | 1 | } |
Unexecuted instantiation: _ZN5doris10PageReaderILb0ELb1EE17parse_page_headerEv _ZN5doris10PageReaderILb0ELb0EE17parse_page_headerEv Line | Count | Source | 77 | 194 | Status PageReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() { | 78 | 194 | if (_state == HEADER_PARSED) { | 79 | 0 | return Status::OK(); | 80 | 0 | } | 81 | 194 | if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) { | 82 | 0 | return Status::IOError("Out-of-bounds Access"); | 83 | 0 | } | 84 | 194 | if (UNLIKELY(_offset != _next_header_offset)) { | 85 | 0 | return Status::IOError("Wrong header position, should seek to a page header first"); | 86 | 0 | } | 87 | 194 | if (UNLIKELY(_state != INITIALIZED)) { | 88 | 0 | return Status::IOError("Should skip or load current page to get next page"); | 89 | 0 | } | 90 | | | 91 | 194 | _page_statistics.page_read_counter += 1; | 92 | | | 93 | | // Parse page header from file; header bytes are saved for possible cache insertion | 94 | 194 | const uint8_t* page_header_buf = nullptr; | 95 | 194 | size_t max_size = _end_offset - _offset; | 96 | 194 | size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size); | 97 | 194 | const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20; | 98 | 194 | uint32_t real_header_size = 0; | 99 | | | 100 | | // Try a header-only lookup in the page cache. Cached pages store | 101 | | // header + optional v2 levels + uncompressed payload, so we can | 102 | | // parse the page header directly from the cached bytes and avoid | 103 | | // a file read for the header. | 104 | 194 | if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache && | 105 | 194 | StoragePageCache::instance() != nullptr) { | 106 | 194 | PageCacheHandle handle; | 107 | 194 | StoragePageCache::CacheKey key = make_page_cache_key(static_cast<int64_t>(_offset)); | 108 | 194 | if (StoragePageCache::instance()->lookup(key, &handle, segment_v2::DATA_PAGE)) { | 109 | | // Parse header directly from cached data | 110 | 120 | _page_cache_handle = std::move(handle); | 111 | 120 | Slice s = _page_cache_handle.data(); | 112 | 120 | real_header_size = cast_set<uint32_t>(s.size); | 113 | 120 | SCOPED_RAW_TIMER(&_page_statistics.decode_header_time); | 114 | 120 | auto st = deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(s.data), | 115 | 120 | &real_header_size, true, &_cur_page_header); | 116 | 120 | if (!st.ok()) return st; | 117 | | // Increment page cache counters for a true cache hit on header+payload | 118 | 120 | _page_statistics.page_cache_hit_counter += 1; | 119 | | // Detect whether the cached payload is compressed or decompressed and record | 120 | 120 | bool is_cache_payload_decompressed = | 121 | 120 | should_cache_decompressed(&_cur_page_header, _metadata); | 122 | | | 123 | 120 | if (is_cache_payload_decompressed) { | 124 | 120 | _page_statistics.page_cache_decompressed_hit_counter += 1; | 125 | 120 | } else { | 126 | 0 | _page_statistics.page_cache_compressed_hit_counter += 1; | 127 | 0 | } | 128 | | | 129 | 120 | _is_cache_payload_decompressed = is_cache_payload_decompressed; | 130 | | | 131 | 120 | if constexpr (OFFSET_INDEX == false) { | 132 | 120 | if (is_header_v2()) { | 133 | 8 | _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows; | 134 | 112 | } else if constexpr (!IN_COLLECTION) { | 135 | 112 | _end_row = _start_row + _cur_page_header.data_page_header.num_values; | 136 | 112 | } | 137 | 120 | } | 138 | | | 139 | | // Save header bytes for later use (e.g., to insert updated cache entries) | 140 | 120 | _header_buf.assign(s.data, s.data + real_header_size); | 141 | 120 | _last_header_size = real_header_size; | 142 | 120 | _page_statistics.parse_page_header_num++; | 143 | 120 | _offset += real_header_size; | 144 | 120 | _next_header_offset = _offset + _cur_page_header.compressed_page_size; | 145 | 120 | _state = HEADER_PARSED; | 146 | 120 | return Status::OK(); | 147 | 120 | } else { | 148 | 74 | _page_statistics.page_cache_missing_counter += 1; | 149 | | // Clear any existing cache handle on miss to avoid holding stale handle | 150 | 74 | _page_cache_handle = PageCacheHandle(); | 151 | 74 | } | 152 | 194 | } | 153 | | // NOTE: page cache lookup for *decompressed* page data is handled in | 154 | | // ColumnChunkReader::load_page_data(). PageReader should only be | 155 | | // responsible for parsing the header bytes from the file and saving | 156 | | // them in `_header_buf` for possible later insertion into the cache. | 157 | 75 | while (true) { | 158 | 75 | if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) { | 159 | 0 | return Status::EndOfFile("stop"); | 160 | 0 | } | 161 | 75 | header_size = std::min(header_size, max_size); | 162 | 75 | { | 163 | 75 | SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time); | 164 | 75 | RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx)); | 165 | 75 | } | 166 | 75 | real_header_size = cast_set<uint32_t>(header_size); | 167 | 75 | SCOPED_RAW_TIMER(&_page_statistics.decode_header_time); | 168 | 75 | auto st = | 169 | 75 | deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header); | 170 | 75 | if (st.ok()) { | 171 | 74 | break; | 172 | 74 | } | 173 | 1 | if (_offset + header_size >= _end_offset || real_header_size > MAX_PAGE_HEADER_SIZE) { | 174 | 0 | return Status::IOError( | 175 | 0 | "Failed to deserialize parquet page header. offset: {}, " | 176 | 0 | "header size: {}, end offset: {}, real header size: {}", | 177 | 0 | _offset, header_size, _end_offset, real_header_size); | 178 | 0 | } | 179 | 1 | header_size <<= 2; | 180 | 1 | } | 181 | | | 182 | 74 | if constexpr (OFFSET_INDEX == false) { | 183 | 74 | if (is_header_v2()) { | 184 | 4 | _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows; | 185 | 70 | } else if constexpr (!IN_COLLECTION) { | 186 | 70 | _end_row = _start_row + _cur_page_header.data_page_header.num_values; | 187 | 70 | } | 188 | 74 | } | 189 | | | 190 | | // Save header bytes for possible cache insertion later | 191 | 74 | _header_buf.assign(page_header_buf, page_header_buf + real_header_size); | 192 | 74 | _last_header_size = real_header_size; | 193 | 74 | _page_statistics.parse_page_header_num++; | 194 | 74 | _offset += real_header_size; | 195 | 74 | _next_header_offset = _offset + _cur_page_header.compressed_page_size; | 196 | 74 | _state = HEADER_PARSED; | 197 | 74 | return Status::OK(); | 198 | 74 | } |
|
199 | | |
200 | | template <bool IN_COLLECTION, bool OFFSET_INDEX> |
201 | 74 | Status PageReader<IN_COLLECTION, OFFSET_INDEX>::get_page_data(Slice& slice) { |
202 | 74 | if (UNLIKELY(_state != HEADER_PARSED)) { |
203 | 0 | return Status::IOError("Should generate page header first to load current page data"); |
204 | 0 | } |
205 | 74 | if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) { |
206 | 0 | return Status::EndOfFile("stop"); |
207 | 0 | } |
208 | 74 | slice.size = _cur_page_header.compressed_page_size; |
209 | 74 | RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx)); |
210 | 74 | _offset += slice.size; |
211 | 74 | _state = DATA_LOADED; |
212 | 74 | return Status::OK(); |
213 | 74 | } Unexecuted instantiation: _ZN5doris10PageReaderILb1ELb1EE13get_page_dataERNS_5SliceE _ZN5doris10PageReaderILb1ELb0EE13get_page_dataERNS_5SliceE Line | Count | Source | 201 | 1 | Status PageReader<IN_COLLECTION, OFFSET_INDEX>::get_page_data(Slice& slice) { | 202 | 1 | if (UNLIKELY(_state != HEADER_PARSED)) { | 203 | 0 | return Status::IOError("Should generate page header first to load current page data"); | 204 | 0 | } | 205 | 1 | if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) { | 206 | 0 | return Status::EndOfFile("stop"); | 207 | 0 | } | 208 | 1 | slice.size = _cur_page_header.compressed_page_size; | 209 | 1 | RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx)); | 210 | 1 | _offset += slice.size; | 211 | 1 | _state = DATA_LOADED; | 212 | 1 | return Status::OK(); | 213 | 1 | } |
Unexecuted instantiation: _ZN5doris10PageReaderILb0ELb1EE13get_page_dataERNS_5SliceE _ZN5doris10PageReaderILb0ELb0EE13get_page_dataERNS_5SliceE Line | Count | Source | 201 | 73 | Status PageReader<IN_COLLECTION, OFFSET_INDEX>::get_page_data(Slice& slice) { | 202 | 73 | if (UNLIKELY(_state != HEADER_PARSED)) { | 203 | 0 | return Status::IOError("Should generate page header first to load current page data"); | 204 | 0 | } | 205 | 73 | if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) { | 206 | 0 | return Status::EndOfFile("stop"); | 207 | 0 | } | 208 | 73 | slice.size = _cur_page_header.compressed_page_size; | 209 | 73 | RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx)); | 210 | 73 | _offset += slice.size; | 211 | 73 | _state = DATA_LOADED; | 212 | 73 | return Status::OK(); | 213 | 73 | } |
|
214 | | |
215 | | template class PageReader<true, true>; |
216 | | template class PageReader<true, false>; |
217 | | template class PageReader<false, true>; |
218 | | template class PageReader<false, false>; |
219 | | |
220 | | } // namespace doris |