be/src/format/parquet/vparquet_page_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format/parquet/vparquet_page_reader.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/parquet_types.h> |
22 | | #include <stddef.h> |
23 | | #include <stdint.h> |
24 | | |
25 | | #include <algorithm> |
26 | | |
27 | | #include "common/compiler_util.h" // IWYU pragma: keep |
28 | | #include "common/config.h" |
29 | | #include "format/parquet/parquet_common.h" |
30 | | #include "io/fs/buffered_reader.h" |
31 | | #include "runtime/runtime_profile.h" |
32 | | #include "storage/cache/page_cache.h" |
33 | | #include "util/slice.h" |
34 | | #include "util/thrift_util.h" |
35 | | |
36 | | namespace doris { |
37 | | namespace io { |
38 | | struct IOContext; |
39 | | } // namespace io |
40 | | } // namespace doris |
41 | | |
42 | | namespace doris { |
43 | | #include "common/compile_check_begin.h" |
44 | | static constexpr size_t INIT_PAGE_HEADER_SIZE = 128; |
45 | | |
46 | 163 | void ParquetPageCacheKeyBuilder::init(const std::string& path, int64_t mtime) { |
47 | 163 | _file_key_prefix = fmt::format("{}::{}", path, mtime); |
48 | 163 | } |
49 | | |
50 | | template <bool IN_COLLECTION, bool OFFSET_INDEX> |
51 | | PageReader<IN_COLLECTION, OFFSET_INDEX>::PageReader(io::BufferedStreamReader* reader, |
52 | | io::IOContext* io_ctx, uint64_t offset, |
53 | | uint64_t length, size_t total_rows, |
54 | | const tparquet::ColumnMetaData& metadata, |
55 | | const ParquetPageReadContext& page_read_ctx, |
56 | | const tparquet::OffsetIndex* offset_index) |
57 | 163 | : _reader(reader), |
58 | 163 | _io_ctx(io_ctx), |
59 | 163 | _offset(offset), |
60 | 163 | _start_offset(offset), |
61 | 163 | _end_offset(offset + length), |
62 | 163 | _total_rows(total_rows), |
63 | 163 | _metadata(metadata), |
64 | 163 | _page_read_ctx(page_read_ctx), |
65 | 163 | _offset_index(offset_index) { |
66 | 163 | _next_header_offset = _offset; |
67 | 163 | _state = INITIALIZED; |
68 | 163 | _page_cache_key_builder.init(_reader->path(), _reader->mtime()); |
69 | | |
70 | 163 | if constexpr (OFFSET_INDEX) { |
71 | 0 | _end_row = _offset_index->page_locations.size() >= 2 |
72 | 0 | ? _offset_index->page_locations[1].first_row_index |
73 | 0 | : _total_rows; |
74 | 0 | } |
75 | 163 | } Unexecuted instantiation: _ZN5doris10PageReaderILb1ELb1EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE _ZN5doris10PageReaderILb1ELb0EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE Line | Count | Source | 57 | 2 | : _reader(reader), | 58 | 2 | _io_ctx(io_ctx), | 59 | 2 | _offset(offset), | 60 | 2 | _start_offset(offset), | 61 | 2 | _end_offset(offset + length), | 62 | 2 | _total_rows(total_rows), | 63 | 2 | _metadata(metadata), | 64 | 2 | _page_read_ctx(page_read_ctx), | 65 | 2 | _offset_index(offset_index) { | 66 | 2 | _next_header_offset = _offset; | 67 | 2 | _state = INITIALIZED; | 68 | 2 | _page_cache_key_builder.init(_reader->path(), _reader->mtime()); | 69 | | | 70 | | if constexpr (OFFSET_INDEX) { | 71 | | _end_row = _offset_index->page_locations.size() >= 2 | 72 | | ? _offset_index->page_locations[1].first_row_index | 73 | | : _total_rows; | 74 | | } | 75 | 2 | } |
Unexecuted instantiation: _ZN5doris10PageReaderILb0ELb1EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE _ZN5doris10PageReaderILb0ELb0EEC2EPNS_2io20BufferedStreamReaderEPNS2_9IOContextEmmmRKN8tparquet14ColumnMetaDataERKNS_22ParquetPageReadContextEPKNS7_11OffsetIndexE Line | Count | Source | 57 | 161 | : _reader(reader), | 58 | 161 | _io_ctx(io_ctx), | 59 | 161 | _offset(offset), | 60 | 161 | _start_offset(offset), | 61 | 161 | _end_offset(offset + length), | 62 | 161 | _total_rows(total_rows), | 63 | 161 | _metadata(metadata), | 64 | 161 | _page_read_ctx(page_read_ctx), | 65 | 161 | _offset_index(offset_index) { | 66 | 161 | _next_header_offset = _offset; | 67 | 161 | _state = INITIALIZED; | 68 | 161 | _page_cache_key_builder.init(_reader->path(), _reader->mtime()); | 69 | | | 70 | | if constexpr (OFFSET_INDEX) { | 71 | | _end_row = _offset_index->page_locations.size() >= 2 | 72 | | ? _offset_index->page_locations[1].first_row_index | 73 | | : _total_rows; | 74 | | } | 75 | 161 | } |
|
76 | | |
77 | | template <bool IN_COLLECTION, bool OFFSET_INDEX> |
78 | 189 | Status PageReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() { |
79 | 189 | if (_state == HEADER_PARSED) { |
80 | 0 | return Status::OK(); |
81 | 0 | } |
82 | 189 | if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) { |
83 | 0 | return Status::IOError("Out-of-bounds Access"); |
84 | 0 | } |
85 | 189 | if (UNLIKELY(_offset != _next_header_offset)) { |
86 | 0 | return Status::IOError("Wrong header position, should seek to a page header first"); |
87 | 0 | } |
88 | 189 | if (UNLIKELY(_state != INITIALIZED)) { |
89 | 0 | return Status::IOError("Should skip or load current page to get next page"); |
90 | 0 | } |
91 | | |
92 | 189 | _page_statistics.page_read_counter += 1; |
93 | | |
94 | | // Parse page header from file; header bytes are saved for possible cache insertion |
95 | 189 | const uint8_t* page_header_buf = nullptr; |
96 | 189 | size_t max_size = _end_offset - _offset; |
97 | 189 | size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size); |
98 | 189 | const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20; |
99 | 189 | uint32_t real_header_size = 0; |
100 | | |
101 | | // Try a header-only lookup in the page cache. Cached pages store |
102 | | // header + optional v2 levels + uncompressed payload, so we can |
103 | | // parse the page header directly from the cached bytes and avoid |
104 | | // a file read for the header. |
105 | 189 | if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache && |
106 | 189 | StoragePageCache::instance() != nullptr) { |
107 | 189 | PageCacheHandle handle; |
108 | 189 | StoragePageCache::CacheKey key = make_page_cache_key(static_cast<int64_t>(_offset)); |
109 | 189 | if (StoragePageCache::instance()->lookup(key, &handle, segment_v2::DATA_PAGE)) { |
110 | | // Parse header directly from cached data |
111 | 121 | _page_cache_handle = std::move(handle); |
112 | 121 | Slice s = _page_cache_handle.data(); |
113 | 121 | real_header_size = cast_set<uint32_t>(s.size); |
114 | 121 | SCOPED_RAW_TIMER(&_page_statistics.decode_header_time); |
115 | 121 | auto st = deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(s.data), |
116 | 121 | &real_header_size, true, &_cur_page_header); |
117 | 121 | if (!st.ok()) return st; |
118 | | // Increment page cache counters for a true cache hit on header+payload |
119 | 121 | _page_statistics.page_cache_hit_counter += 1; |
120 | | // Detect whether the cached payload is compressed or decompressed and record |
121 | 121 | bool is_cache_payload_decompressed = |
122 | 121 | should_cache_decompressed(&_cur_page_header, _metadata); |
123 | | |
124 | 121 | if (is_cache_payload_decompressed) { |
125 | 121 | _page_statistics.page_cache_decompressed_hit_counter += 1; |
126 | 121 | } else { |
127 | 0 | _page_statistics.page_cache_compressed_hit_counter += 1; |
128 | 0 | } |
129 | | |
130 | 121 | _is_cache_payload_decompressed = is_cache_payload_decompressed; |
131 | | |
132 | 121 | if constexpr (OFFSET_INDEX == false) { |
133 | 121 | if (is_header_v2()) { |
134 | 8 | _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows; |
135 | 112 | } else if constexpr (!IN_COLLECTION) { |
136 | 112 | _end_row = _start_row + _cur_page_header.data_page_header.num_values; |
137 | 112 | } |
138 | 121 | } |
139 | | |
140 | | // Save header bytes for later use (e.g., to insert updated cache entries) |
141 | 121 | _header_buf.assign(s.data, s.data + real_header_size); |
142 | 121 | _last_header_size = real_header_size; |
143 | 121 | _page_statistics.parse_page_header_num++; |
144 | 121 | _offset += real_header_size; |
145 | 121 | _next_header_offset = _offset + _cur_page_header.compressed_page_size; |
146 | 121 | _state = HEADER_PARSED; |
147 | 121 | return Status::OK(); |
148 | 121 | } else { |
149 | 68 | _page_statistics.page_cache_missing_counter += 1; |
150 | | // Clear any existing cache handle on miss to avoid holding stale handle |
151 | 68 | _page_cache_handle = PageCacheHandle(); |
152 | 68 | } |
153 | 189 | } |
154 | | // NOTE: page cache lookup for *decompressed* page data is handled in |
155 | | // ColumnChunkReader::load_page_data(). PageReader should only be |
156 | | // responsible for parsing the header bytes from the file and saving |
157 | | // them in `_header_buf` for possible later insertion into the cache. |
158 | 68 | while (true) { |
159 | 68 | if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) { |
160 | 0 | return Status::EndOfFile("stop"); |
161 | 0 | } |
162 | 68 | header_size = std::min(header_size, max_size); |
163 | 68 | { |
164 | 68 | SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time); |
165 | 68 | RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx)); |
166 | 68 | } |
167 | 68 | real_header_size = cast_set<uint32_t>(header_size); |
168 | 68 | SCOPED_RAW_TIMER(&_page_statistics.decode_header_time); |
169 | 68 | auto st = |
170 | 68 | deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header); |
171 | 68 | if (st.ok()) { |
172 | 68 | break; |
173 | 68 | } |
174 | 0 | if (_offset + header_size >= _end_offset || real_header_size > MAX_PAGE_HEADER_SIZE) { |
175 | 0 | return Status::IOError( |
176 | 0 | "Failed to deserialize parquet page header. offset: {}, " |
177 | 0 | "header size: {}, end offset: {}, real header size: {}", |
178 | 0 | _offset, header_size, _end_offset, real_header_size); |
179 | 0 | } |
180 | 0 | header_size <<= 2; |
181 | 0 | } |
182 | | |
183 | 68 | if constexpr (OFFSET_INDEX == false) { |
184 | 68 | if (is_header_v2()) { |
185 | 2 | _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows; |
186 | 65 | } else if constexpr (!IN_COLLECTION) { |
187 | 65 | _end_row = _start_row + _cur_page_header.data_page_header.num_values; |
188 | 65 | } |
189 | 68 | } |
190 | | |
191 | | // Save header bytes for possible cache insertion later |
192 | 68 | _header_buf.assign(page_header_buf, page_header_buf + real_header_size); |
193 | 68 | _last_header_size = real_header_size; |
194 | 68 | _page_statistics.parse_page_header_num++; |
195 | 68 | _offset += real_header_size; |
196 | 68 | _next_header_offset = _offset + _cur_page_header.compressed_page_size; |
197 | 68 | _state = HEADER_PARSED; |
198 | 68 | return Status::OK(); |
199 | 68 | } Unexecuted instantiation: _ZN5doris10PageReaderILb1ELb1EE17parse_page_headerEv _ZN5doris10PageReaderILb1ELb0EE17parse_page_headerEv Line | Count | Source | 78 | 2 | Status PageReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() { | 79 | 2 | if (_state == HEADER_PARSED) { | 80 | 0 | return Status::OK(); | 81 | 0 | } | 82 | 2 | if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) { | 83 | 0 | return Status::IOError("Out-of-bounds Access"); | 84 | 0 | } | 85 | 2 | if (UNLIKELY(_offset != _next_header_offset)) { | 86 | 0 | return Status::IOError("Wrong header position, should seek to a page header first"); | 87 | 0 | } | 88 | 2 | if (UNLIKELY(_state != INITIALIZED)) { | 89 | 0 | return Status::IOError("Should skip or load current page to get next page"); | 90 | 0 | } | 91 | | | 92 | 2 | _page_statistics.page_read_counter += 1; | 93 | | | 94 | | // Parse page header from file; header bytes are saved for possible cache insertion | 95 | 2 | const uint8_t* page_header_buf = nullptr; | 96 | 2 | size_t max_size = _end_offset - _offset; | 97 | 2 | size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size); | 98 | 2 | const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20; | 99 | 2 | uint32_t real_header_size = 0; | 100 | | | 101 | | // Try a header-only lookup in the page cache. Cached pages store | 102 | | // header + optional v2 levels + uncompressed payload, so we can | 103 | | // parse the page header directly from the cached bytes and avoid | 104 | | // a file read for the header. | 105 | 2 | if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache && | 106 | 2 | StoragePageCache::instance() != nullptr) { | 107 | 2 | PageCacheHandle handle; | 108 | 2 | StoragePageCache::CacheKey key = make_page_cache_key(static_cast<int64_t>(_offset)); | 109 | 2 | if (StoragePageCache::instance()->lookup(key, &handle, segment_v2::DATA_PAGE)) { | 110 | | // Parse header directly from cached data | 111 | 1 | _page_cache_handle = std::move(handle); | 112 | 1 | Slice s = _page_cache_handle.data(); | 113 | 1 | real_header_size = cast_set<uint32_t>(s.size); | 114 | 1 | SCOPED_RAW_TIMER(&_page_statistics.decode_header_time); | 115 | 1 | auto st = deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(s.data), | 116 | 1 | &real_header_size, true, &_cur_page_header); | 117 | 1 | if (!st.ok()) return st; | 118 | | // Increment page cache counters for a true cache hit on header+payload | 119 | 1 | _page_statistics.page_cache_hit_counter += 1; | 120 | | // Detect whether the cached payload is compressed or decompressed and record | 121 | 1 | bool is_cache_payload_decompressed = | 122 | 1 | should_cache_decompressed(&_cur_page_header, _metadata); | 123 | | | 124 | 1 | if (is_cache_payload_decompressed) { | 125 | 1 | _page_statistics.page_cache_decompressed_hit_counter += 1; | 126 | 1 | } else { | 127 | 0 | _page_statistics.page_cache_compressed_hit_counter += 1; | 128 | 0 | } | 129 | | | 130 | 1 | _is_cache_payload_decompressed = is_cache_payload_decompressed; | 131 | | | 132 | 1 | if constexpr (OFFSET_INDEX == false) { | 133 | 1 | if (is_header_v2()) { | 134 | 0 | _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows; | 135 | 1 | } else if constexpr (!IN_COLLECTION) { | 136 | 1 | _end_row = _start_row + _cur_page_header.data_page_header.num_values; | 137 | 1 | } | 138 | 1 | } | 139 | | | 140 | | // Save header bytes for later use (e.g., to insert updated cache entries) | 141 | 1 | _header_buf.assign(s.data, s.data + real_header_size); | 142 | 1 | _last_header_size = real_header_size; | 143 | 1 | _page_statistics.parse_page_header_num++; | 144 | 1 | _offset += real_header_size; | 145 | 1 | _next_header_offset = _offset + _cur_page_header.compressed_page_size; | 146 | 1 | _state = HEADER_PARSED; | 147 | 1 | return Status::OK(); | 148 | 1 | } else { | 149 | 1 | _page_statistics.page_cache_missing_counter += 1; | 150 | | // Clear any existing cache handle on miss to avoid holding stale handle | 151 | 1 | _page_cache_handle = PageCacheHandle(); | 152 | 1 | } | 153 | 2 | } | 154 | | // NOTE: page cache lookup for *decompressed* page data is handled in | 155 | | // ColumnChunkReader::load_page_data(). PageReader should only be | 156 | | // responsible for parsing the header bytes from the file and saving | 157 | | // them in `_header_buf` for possible later insertion into the cache. | 158 | 1 | while (true) { | 159 | 1 | if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) { | 160 | 0 | return Status::EndOfFile("stop"); | 161 | 0 | } | 162 | 1 | header_size = std::min(header_size, max_size); | 163 | 1 | { | 164 | 1 | SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time); | 165 | 1 | RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx)); | 166 | 1 | } | 167 | 1 | real_header_size = cast_set<uint32_t>(header_size); | 168 | 1 | SCOPED_RAW_TIMER(&_page_statistics.decode_header_time); | 169 | 1 | auto st = | 170 | 1 | deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header); | 171 | 1 | if (st.ok()) { | 172 | 1 | break; | 173 | 1 | } | 174 | 0 | if (_offset + header_size >= _end_offset || real_header_size > MAX_PAGE_HEADER_SIZE) { | 175 | 0 | return Status::IOError( | 176 | 0 | "Failed to deserialize parquet page header. offset: {}, " | 177 | 0 | "header size: {}, end offset: {}, real header size: {}", | 178 | 0 | _offset, header_size, _end_offset, real_header_size); | 179 | 0 | } | 180 | 0 | header_size <<= 2; | 181 | 0 | } | 182 | | | 183 | 1 | if constexpr (OFFSET_INDEX == false) { | 184 | 1 | if (is_header_v2()) { | 185 | 0 | _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows; | 186 | 1 | } else if constexpr (!IN_COLLECTION) { | 187 | 1 | _end_row = _start_row + _cur_page_header.data_page_header.num_values; | 188 | 1 | } | 189 | 1 | } | 190 | | | 191 | | // Save header bytes for possible cache insertion later | 192 | 1 | _header_buf.assign(page_header_buf, page_header_buf + real_header_size); | 193 | 1 | _last_header_size = real_header_size; | 194 | 1 | _page_statistics.parse_page_header_num++; | 195 | 1 | _offset += real_header_size; | 196 | 1 | _next_header_offset = _offset + _cur_page_header.compressed_page_size; | 197 | 1 | _state = HEADER_PARSED; | 198 | 1 | return Status::OK(); | 199 | 1 | } |
Unexecuted instantiation: _ZN5doris10PageReaderILb0ELb1EE17parse_page_headerEv _ZN5doris10PageReaderILb0ELb0EE17parse_page_headerEv Line | Count | Source | 78 | 187 | Status PageReader<IN_COLLECTION, OFFSET_INDEX>::parse_page_header() { | 79 | 187 | if (_state == HEADER_PARSED) { | 80 | 0 | return Status::OK(); | 81 | 0 | } | 82 | 187 | if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) { | 83 | 0 | return Status::IOError("Out-of-bounds Access"); | 84 | 0 | } | 85 | 187 | if (UNLIKELY(_offset != _next_header_offset)) { | 86 | 0 | return Status::IOError("Wrong header position, should seek to a page header first"); | 87 | 0 | } | 88 | 187 | if (UNLIKELY(_state != INITIALIZED)) { | 89 | 0 | return Status::IOError("Should skip or load current page to get next page"); | 90 | 0 | } | 91 | | | 92 | 187 | _page_statistics.page_read_counter += 1; | 93 | | | 94 | | // Parse page header from file; header bytes are saved for possible cache insertion | 95 | 187 | const uint8_t* page_header_buf = nullptr; | 96 | 187 | size_t max_size = _end_offset - _offset; | 97 | 187 | size_t header_size = std::min(INIT_PAGE_HEADER_SIZE, max_size); | 98 | 187 | const size_t MAX_PAGE_HEADER_SIZE = config::parquet_header_max_size_mb << 20; | 99 | 187 | uint32_t real_header_size = 0; | 100 | | | 101 | | // Try a header-only lookup in the page cache. Cached pages store | 102 | | // header + optional v2 levels + uncompressed payload, so we can | 103 | | // parse the page header directly from the cached bytes and avoid | 104 | | // a file read for the header. | 105 | 187 | if (_page_read_ctx.enable_parquet_file_page_cache && !config::disable_storage_page_cache && | 106 | 187 | StoragePageCache::instance() != nullptr) { | 107 | 187 | PageCacheHandle handle; | 108 | 187 | StoragePageCache::CacheKey key = make_page_cache_key(static_cast<int64_t>(_offset)); | 109 | 187 | if (StoragePageCache::instance()->lookup(key, &handle, segment_v2::DATA_PAGE)) { | 110 | | // Parse header directly from cached data | 111 | 120 | _page_cache_handle = std::move(handle); | 112 | 120 | Slice s = _page_cache_handle.data(); | 113 | 120 | real_header_size = cast_set<uint32_t>(s.size); | 114 | 120 | SCOPED_RAW_TIMER(&_page_statistics.decode_header_time); | 115 | 120 | auto st = deserialize_thrift_msg(reinterpret_cast<const uint8_t*>(s.data), | 116 | 120 | &real_header_size, true, &_cur_page_header); | 117 | 120 | if (!st.ok()) return st; | 118 | | // Increment page cache counters for a true cache hit on header+payload | 119 | 120 | _page_statistics.page_cache_hit_counter += 1; | 120 | | // Detect whether the cached payload is compressed or decompressed and record | 121 | 120 | bool is_cache_payload_decompressed = | 122 | 120 | should_cache_decompressed(&_cur_page_header, _metadata); | 123 | | | 124 | 120 | if (is_cache_payload_decompressed) { | 125 | 120 | _page_statistics.page_cache_decompressed_hit_counter += 1; | 126 | 120 | } else { | 127 | 0 | _page_statistics.page_cache_compressed_hit_counter += 1; | 128 | 0 | } | 129 | | | 130 | 120 | _is_cache_payload_decompressed = is_cache_payload_decompressed; | 131 | | | 132 | 120 | if constexpr (OFFSET_INDEX == false) { | 133 | 120 | if (is_header_v2()) { | 134 | 8 | _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows; | 135 | 112 | } else if constexpr (!IN_COLLECTION) { | 136 | 112 | _end_row = _start_row + _cur_page_header.data_page_header.num_values; | 137 | 112 | } | 138 | 120 | } | 139 | | | 140 | | // Save header bytes for later use (e.g., to insert updated cache entries) | 141 | 120 | _header_buf.assign(s.data, s.data + real_header_size); | 142 | 120 | _last_header_size = real_header_size; | 143 | 120 | _page_statistics.parse_page_header_num++; | 144 | 120 | _offset += real_header_size; | 145 | 120 | _next_header_offset = _offset + _cur_page_header.compressed_page_size; | 146 | 120 | _state = HEADER_PARSED; | 147 | 120 | return Status::OK(); | 148 | 120 | } else { | 149 | 67 | _page_statistics.page_cache_missing_counter += 1; | 150 | | // Clear any existing cache handle on miss to avoid holding stale handle | 151 | 67 | _page_cache_handle = PageCacheHandle(); | 152 | 67 | } | 153 | 187 | } | 154 | | // NOTE: page cache lookup for *decompressed* page data is handled in | 155 | | // ColumnChunkReader::load_page_data(). PageReader should only be | 156 | | // responsible for parsing the header bytes from the file and saving | 157 | | // them in `_header_buf` for possible later insertion into the cache. | 158 | 67 | while (true) { | 159 | 67 | if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) { | 160 | 0 | return Status::EndOfFile("stop"); | 161 | 0 | } | 162 | 67 | header_size = std::min(header_size, max_size); | 163 | 67 | { | 164 | 67 | SCOPED_RAW_TIMER(&_page_statistics.read_page_header_time); | 165 | 67 | RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx)); | 166 | 67 | } | 167 | 67 | real_header_size = cast_set<uint32_t>(header_size); | 168 | 67 | SCOPED_RAW_TIMER(&_page_statistics.decode_header_time); | 169 | 67 | auto st = | 170 | 67 | deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header); | 171 | 67 | if (st.ok()) { | 172 | 67 | break; | 173 | 67 | } | 174 | 0 | if (_offset + header_size >= _end_offset || real_header_size > MAX_PAGE_HEADER_SIZE) { | 175 | 0 | return Status::IOError( | 176 | 0 | "Failed to deserialize parquet page header. offset: {}, " | 177 | 0 | "header size: {}, end offset: {}, real header size: {}", | 178 | 0 | _offset, header_size, _end_offset, real_header_size); | 179 | 0 | } | 180 | 0 | header_size <<= 2; | 181 | 0 | } | 182 | | | 183 | 67 | if constexpr (OFFSET_INDEX == false) { | 184 | 67 | if (is_header_v2()) { | 185 | 2 | _end_row = _start_row + _cur_page_header.data_page_header_v2.num_rows; | 186 | 65 | } else if constexpr (!IN_COLLECTION) { | 187 | 65 | _end_row = _start_row + _cur_page_header.data_page_header.num_values; | 188 | 65 | } | 189 | 67 | } | 190 | | | 191 | | // Save header bytes for possible cache insertion later | 192 | 67 | _header_buf.assign(page_header_buf, page_header_buf + real_header_size); | 193 | 67 | _last_header_size = real_header_size; | 194 | 67 | _page_statistics.parse_page_header_num++; | 195 | 67 | _offset += real_header_size; | 196 | 67 | _next_header_offset = _offset + _cur_page_header.compressed_page_size; | 197 | 67 | _state = HEADER_PARSED; | 198 | 67 | return Status::OK(); | 199 | 67 | } |
|
200 | | |
201 | | template <bool IN_COLLECTION, bool OFFSET_INDEX> |
202 | 67 | Status PageReader<IN_COLLECTION, OFFSET_INDEX>::get_page_data(Slice& slice) { |
203 | 67 | if (UNLIKELY(_state != HEADER_PARSED)) { |
204 | 0 | return Status::IOError("Should generate page header first to load current page data"); |
205 | 0 | } |
206 | 67 | if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) { |
207 | 0 | return Status::EndOfFile("stop"); |
208 | 0 | } |
209 | 67 | slice.size = _cur_page_header.compressed_page_size; |
210 | 67 | RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx)); |
211 | 67 | _offset += slice.size; |
212 | 67 | _state = DATA_LOADED; |
213 | 67 | return Status::OK(); |
214 | 67 | } Unexecuted instantiation: _ZN5doris10PageReaderILb1ELb1EE13get_page_dataERNS_5SliceE _ZN5doris10PageReaderILb1ELb0EE13get_page_dataERNS_5SliceE Line | Count | Source | 202 | 1 | Status PageReader<IN_COLLECTION, OFFSET_INDEX>::get_page_data(Slice& slice) { | 203 | 1 | if (UNLIKELY(_state != HEADER_PARSED)) { | 204 | 0 | return Status::IOError("Should generate page header first to load current page data"); | 205 | 0 | } | 206 | 1 | if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) { | 207 | 0 | return Status::EndOfFile("stop"); | 208 | 0 | } | 209 | 1 | slice.size = _cur_page_header.compressed_page_size; | 210 | 1 | RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx)); | 211 | 1 | _offset += slice.size; | 212 | 1 | _state = DATA_LOADED; | 213 | 1 | return Status::OK(); | 214 | 1 | } |
Unexecuted instantiation: _ZN5doris10PageReaderILb0ELb1EE13get_page_dataERNS_5SliceE _ZN5doris10PageReaderILb0ELb0EE13get_page_dataERNS_5SliceE Line | Count | Source | 202 | 66 | Status PageReader<IN_COLLECTION, OFFSET_INDEX>::get_page_data(Slice& slice) { | 203 | 66 | if (UNLIKELY(_state != HEADER_PARSED)) { | 204 | 0 | return Status::IOError("Should generate page header first to load current page data"); | 205 | 0 | } | 206 | 66 | if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) { | 207 | 0 | return Status::EndOfFile("stop"); | 208 | 0 | } | 209 | 66 | slice.size = _cur_page_header.compressed_page_size; | 210 | 66 | RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx)); | 211 | 66 | _offset += slice.size; | 212 | 66 | _state = DATA_LOADED; | 213 | 66 | return Status::OK(); | 214 | 66 | } |
|
215 | | |
216 | | template class PageReader<true, true>; |
217 | | template class PageReader<true, false>; |
218 | | template class PageReader<false, true>; |
219 | | template class PageReader<false, false>; |
220 | | #include "common/compile_check_end.h" |
221 | | |
222 | | } // namespace doris |