be/src/storage/index/indexed_column_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/index/indexed_column_reader.h" |
19 | | |
20 | | #include <gen_cpp/segment_v2.pb.h> |
21 | | |
22 | | #include <algorithm> |
23 | | |
24 | | #include "common/status.h" |
25 | | #include "io/io_common.h" |
26 | | #include "storage/key_coder.h" |
27 | | #include "storage/olap_common.h" |
28 | | #include "storage/segment/encoding_info.h" // for EncodingInfo |
29 | | #include "storage/segment/options.h" |
30 | | #include "storage/segment/page_decoder.h" |
31 | | #include "storage/segment/page_io.h" |
32 | | #include "storage/types.h" |
33 | | #include "util/block_compression.h" |
34 | | #include "util/bvar_helper.h" |
35 | | |
36 | | namespace doris { |
37 | | using namespace ErrorCode; |
38 | | namespace segment_v2 { |
39 | | |
40 | | static bvar::Adder<uint64_t> g_index_reader_bytes("doris_pk", "index_reader_bytes"); |
41 | | static bvar::Adder<uint64_t> g_index_reader_compressed_bytes("doris_pk", |
42 | | "index_reader_compressed_bytes"); |
43 | | static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_bytes_per_second( |
44 | | "doris_pk", "index_reader_bytes_per_second", &g_index_reader_bytes, 60); |
45 | | static bvar::Adder<uint64_t> g_index_reader_pages("doris_pk", "index_reader_pages"); |
46 | | static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_pages_per_second( |
47 | | "doris_pk", "index_reader_pages_per_second", &g_index_reader_pages, 60); |
48 | | static bvar::Adder<uint64_t> g_index_reader_cached_pages("doris_pk", "index_reader_cached_pages"); |
49 | | static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_cached_pages_per_second( |
50 | | "doris_pk", "index_reader_cached_pages_per_second", &g_index_reader_cached_pages, 60); |
51 | | static bvar::Adder<uint64_t> g_index_reader_seek_count("doris_pk", "index_reader_seek_count"); |
52 | | static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_seek_per_second( |
53 | | "doris_pk", "index_reader_seek_per_second", &g_index_reader_seek_count, 60); |
54 | | static bvar::Adder<uint64_t> g_index_reader_pk_pages("doris_pk", "index_reader_pk_pages"); |
55 | | static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_pk_bytes_per_second( |
56 | | "doris_pk", "index_reader_pk_pages_per_second", &g_index_reader_pk_pages, 60); |
57 | | |
58 | 181 | int64_t IndexedColumnReader::get_metadata_size() const { |
59 | 181 | return sizeof(IndexedColumnReader) + _meta.ByteSizeLong(); |
60 | 181 | } |
61 | | |
62 | | Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory, |
63 | 181 | OlapReaderStatistics* index_load_stats) { |
64 | 181 | _use_page_cache = use_page_cache; |
65 | 181 | _kept_in_memory = kept_in_memory; |
66 | | |
67 | 181 | _type_info = get_scalar_type_info((FieldType)_meta.data_type()); |
68 | 181 | if (_type_info == nullptr) { |
69 | 0 | return Status::NotSupported("unsupported typeinfo, type={}", _meta.data_type()); |
70 | 0 | } |
71 | 181 | RETURN_IF_ERROR(EncodingInfo::get(_type_info->type(), _meta.encoding(), {}, &_encoding_info)); |
72 | 181 | _value_key_coder = get_key_coder(_type_info->type()); |
73 | | |
74 | | // read and parse ordinal index page when exists |
75 | 181 | if (_meta.has_ordinal_index_meta()) { |
76 | 181 | if (_meta.ordinal_index_meta().is_root_data_page()) { |
77 | 160 | _sole_data_page = PagePointer(_meta.ordinal_index_meta().root_page()); |
78 | 160 | } else { |
79 | 21 | RETURN_IF_ERROR(load_index_page(_meta.ordinal_index_meta().root_page(), |
80 | 21 | &_ordinal_index_page_handle, |
81 | 21 | _ordinal_index_reader.get(), index_load_stats)); |
82 | 21 | _has_index_page = true; |
83 | 21 | } |
84 | 181 | } |
85 | | |
86 | | // read and parse value index page when exists |
87 | 181 | if (_meta.has_value_index_meta()) { |
88 | 122 | if (_meta.value_index_meta().is_root_data_page()) { |
89 | 101 | _sole_data_page = PagePointer(_meta.value_index_meta().root_page()); |
90 | 101 | } else { |
91 | 21 | RETURN_IF_ERROR(load_index_page(_meta.value_index_meta().root_page(), |
92 | 21 | &_value_index_page_handle, _value_index_reader.get(), |
93 | 21 | index_load_stats)); |
94 | 21 | _has_index_page = true; |
95 | 21 | } |
96 | 122 | } |
97 | 181 | _num_values = _meta.num_values(); |
98 | | |
99 | 181 | update_metadata_size(); |
100 | 181 | return Status::OK(); |
101 | 181 | } |
102 | | |
103 | | Status IndexedColumnReader::load_index_page(const PagePointerPB& pp, PageHandle* handle, |
104 | | IndexPageReader* reader, |
105 | 42 | OlapReaderStatistics* index_load_stats) { |
106 | 42 | Slice body; |
107 | 42 | PageFooterPB footer; |
108 | 42 | BlockCompressionCodec* local_compress_codec; |
109 | 42 | RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &local_compress_codec)); |
110 | 42 | RETURN_IF_ERROR(read_page(PagePointer(pp), handle, &body, &footer, INDEX_PAGE, |
111 | 42 | local_compress_codec, false, index_load_stats)); |
112 | 42 | RETURN_IF_ERROR(reader->parse(body, footer.index_page_footer())); |
113 | 42 | _mem_size += body.get_size(); |
114 | 42 | return Status::OK(); |
115 | 42 | } |
116 | | |
117 | | Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle, Slice* body, |
118 | | PageFooterPB* footer, PageTypePB type, |
119 | | BlockCompressionCodec* codec, bool pre_decode, |
120 | 300 | OlapReaderStatistics* stats) const { |
121 | 300 | OlapReaderStatistics tmp_stats; |
122 | 300 | OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats; |
123 | 300 | PageReadOptions opts(io::IOContext {.is_index_data = true, |
124 | 300 | .file_cache_stats = &stats_ptr->file_cache_stats}); |
125 | 300 | opts.use_page_cache = _use_page_cache; |
126 | 300 | opts.kept_in_memory = _kept_in_memory; |
127 | 300 | opts.pre_decode = pre_decode; |
128 | 300 | opts.type = type; |
129 | 300 | opts.file_reader = _file_reader.get(); |
130 | 300 | opts.page_pointer = pp; |
131 | 300 | opts.codec = codec; |
132 | 300 | opts.stats = stats_ptr; |
133 | 300 | opts.encoding_info = _encoding_info; |
134 | | |
135 | 300 | if (_is_pk_index) { |
136 | 176 | opts.type = PRIMARY_KEY_INDEX_PAGE; |
137 | 176 | } |
138 | 300 | auto st = PageIO::read_and_decompress_page(opts, handle, body, footer); |
139 | 300 | g_index_reader_compressed_bytes << pp.size; |
140 | 300 | g_index_reader_bytes << footer->uncompressed_size(); |
141 | 300 | g_index_reader_pages << 1; |
142 | 300 | g_index_reader_cached_pages << tmp_stats.cached_pages_num; |
143 | 300 | return st; |
144 | 300 | } |
145 | | |
146 | 181 | IndexedColumnReader::~IndexedColumnReader() = default; |
147 | | |
148 | | /////////////////////////////////////////////////////////////////////////////// |
149 | | |
150 | 258 | Status IndexedColumnIterator::_read_data_page(const PagePointer& pp) { |
151 | 258 | Status status; |
152 | | // there is not init() for IndexedColumnIterator, so do it here |
153 | 258 | if (!_compress_codec) { |
154 | 187 | RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), &_compress_codec)); |
155 | 187 | } |
156 | | |
157 | 258 | PageHandle handle; |
158 | 258 | Slice body; |
159 | 258 | PageFooterPB footer; |
160 | 258 | RETURN_IF_ERROR(_reader->read_page(pp, &handle, &body, &footer, DATA_PAGE, _compress_codec, |
161 | 258 | true, _stats)); |
162 | | // parse data page |
163 | | // note that page_index is not used in IndexedColumnIterator, so we pass 0 |
164 | 258 | PageDecoderOptions opts; |
165 | 258 | opts.need_check_bitmap = false; |
166 | 258 | status = ParsedPage::create(std::move(handle), body, footer.data_page_footer(), |
167 | 258 | _reader->encoding_info(), pp, 0, &_data_page, opts); |
168 | 258 | if (!status.ok()) { |
169 | 0 | LOG(WARNING) << "failed to create ParsedPage in IndexedColumnIterator, file=" |
170 | 0 | << _reader->_file_reader->path().native() << ", page_offset=" << pp.offset |
171 | 0 | << ", page_size=" << pp.size << ", error=" << status; |
172 | 0 | } |
173 | 258 | DCHECK(_reader->_meta.ordinal_index_meta().is_root_data_page() |
174 | 258 | ? _reader->_meta.num_values() == _data_page.num_rows |
175 | 258 | : true); |
176 | 258 | return status; |
177 | 258 | } |
178 | | |
179 | 354 | Status IndexedColumnIterator::seek_to_ordinal(ordinal_t idx) { |
180 | 354 | DCHECK(idx <= _reader->num_values()); |
181 | | |
182 | 354 | if (!_reader->support_ordinal_seek()) { |
183 | 0 | return Status::NotSupported("no ordinal index"); |
184 | 0 | } |
185 | | |
186 | | // it's ok to seek past the last value |
187 | 354 | if (idx == _reader->num_values()) { |
188 | 1 | _current_ordinal = idx; |
189 | 1 | _seeked = true; |
190 | 1 | return Status::OK(); |
191 | 1 | } |
192 | | |
193 | 353 | if (!_data_page || !_data_page.contains(idx)) { |
194 | | // need to read the data page containing row at idx |
195 | 190 | if (_reader->_has_index_page) { |
196 | 38 | std::string key; |
197 | 38 | KeyCoderTraits<FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT>::full_encode_ascending(&idx, |
198 | 38 | &key); |
199 | 38 | RETURN_IF_ERROR(_ordinal_iter.seek_at_or_before(key)); |
200 | 38 | RETURN_IF_ERROR(_read_data_page(_ordinal_iter.current_page_pointer())); |
201 | 38 | _current_iter = &_ordinal_iter; |
202 | 152 | } else { |
203 | 152 | RETURN_IF_ERROR(_read_data_page(_reader->_sole_data_page)); |
204 | 152 | } |
205 | 190 | } |
206 | | |
207 | 353 | ordinal_t offset_in_page = idx - _data_page.first_ordinal; |
208 | 353 | RETURN_IF_ERROR(_data_page.data_decoder->seek_to_position_in_page(offset_in_page)); |
209 | 353 | DCHECK(offset_in_page == _data_page.data_decoder->current_index()); |
210 | 353 | _data_page.offset_in_page = offset_in_page; |
211 | 353 | _current_ordinal = idx; |
212 | 353 | _seeked = true; |
213 | 353 | return Status::OK(); |
214 | 353 | } |
215 | | |
216 | 5.23k | Status IndexedColumnIterator::seek_at_or_after(const void* key, bool* exact_match) { |
217 | 5.23k | if (!_reader->support_value_seek()) { |
218 | 0 | return Status::NotSupported("no value index"); |
219 | 0 | } |
220 | | |
221 | 5.23k | if (_reader->num_values() == 0) { |
222 | 0 | return Status::Error<ErrorCode::ENTRY_NOT_FOUND>("value index is empty "); |
223 | 0 | } |
224 | | |
225 | 5.23k | g_index_reader_seek_count << 1; |
226 | | |
227 | 5.23k | bool load_data_page = false; |
228 | 5.23k | PagePointer data_page_pp; |
229 | 5.23k | if (_reader->_has_index_page) { |
230 | | // seek index to determine the data page to seek |
231 | 102 | std::string encoded_key; |
232 | 102 | _reader->_value_key_coder->full_encode_ascending(key, &encoded_key); |
233 | 102 | Status st = _value_iter.seek_at_or_before(encoded_key); |
234 | 102 | if (st.is<ENTRY_NOT_FOUND>()) { |
235 | | // all keys in page is greater than `encoded_key`, point to the first page. |
236 | | // otherwise, we may missing some pages. |
237 | | // For example, the predicate is `col1 > 2`, and the index page is [3,5,7]. |
238 | | // so the `seek_at_or_before(2)` will return Status::Error<ENTRY_NOT_FOUND>(). |
239 | | // But actually, we expect it to point to page `3`. |
240 | 0 | _value_iter.seek_to_first(); |
241 | 102 | } else if (!st.ok()) { |
242 | 0 | return st; |
243 | 0 | } |
244 | 102 | data_page_pp = _value_iter.current_page_pointer(); |
245 | 102 | _current_iter = &_value_iter; |
246 | 102 | if (!_data_page || _data_page.page_pointer != data_page_pp) { |
247 | | // load when it's not the same with the current |
248 | 24 | load_data_page = true; |
249 | 24 | } |
250 | 5.13k | } else if (!_data_page) { |
251 | | // no index page, load data page for the first time |
252 | 14 | load_data_page = true; |
253 | 14 | data_page_pp = PagePointer(_reader->_sole_data_page); |
254 | 14 | } |
255 | | |
256 | 5.23k | if (load_data_page) { |
257 | 38 | RETURN_IF_ERROR(_read_data_page(data_page_pp)); |
258 | 38 | } |
259 | | |
260 | | // seek inside data page |
261 | 5.23k | Status st = _data_page.data_decoder->seek_at_or_after_value(key, exact_match); |
262 | | // return the first row of next page when not found |
263 | 5.23k | if (st.is<ENTRY_NOT_FOUND>() && _reader->_has_index_page) { |
264 | 4 | if (_value_iter.has_next()) { |
265 | 3 | _seeked = true; |
266 | 3 | *exact_match = false; |
267 | 3 | _current_ordinal = _data_page.first_ordinal + _data_page.num_rows; |
268 | | // move offset to the end of the page |
269 | 3 | _data_page.offset_in_page = _data_page.num_rows; |
270 | 3 | return Status::OK(); |
271 | 3 | } |
272 | 4 | } |
273 | 5.23k | RETURN_IF_ERROR(st); |
274 | 5.22k | _data_page.offset_in_page = _data_page.data_decoder->current_index(); |
275 | 5.22k | _current_ordinal = _data_page.first_ordinal + _data_page.offset_in_page; |
276 | 5.22k | DCHECK(_data_page.contains(_current_ordinal)); |
277 | 5.22k | _seeked = true; |
278 | 5.22k | return Status::OK(); |
279 | 5.23k | } |
280 | | |
281 | 354 | Status IndexedColumnIterator::next_batch(size_t* n, MutableColumnPtr& dst) { |
282 | 354 | DCHECK(_seeked); |
283 | 354 | if (_current_ordinal == _reader->num_values()) { |
284 | 0 | *n = 0; |
285 | 0 | return Status::OK(); |
286 | 0 | } |
287 | | |
288 | 354 | size_t remaining = *n; |
289 | 738 | while (remaining > 0) { |
290 | 384 | if (!_data_page.has_remaining()) { |
291 | | // trying to read next data page |
292 | 30 | if (!_reader->_has_index_page) { |
293 | 0 | break; // no more data page |
294 | 0 | } |
295 | 30 | bool has_next = _current_iter->move_next(); |
296 | 30 | if (!has_next) { |
297 | 0 | break; // no more data page |
298 | 0 | } |
299 | 30 | RETURN_IF_ERROR(_read_data_page(_current_iter->current_page_pointer())); |
300 | 30 | } |
301 | | |
302 | 384 | size_t rows_to_read = std::min(_data_page.remaining(), remaining); |
303 | 384 | size_t rows_read = rows_to_read; |
304 | 384 | RETURN_IF_ERROR(_data_page.data_decoder->next_batch(&rows_read, dst)); |
305 | 384 | DCHECK(rows_to_read == rows_read); |
306 | | |
307 | 384 | _data_page.offset_in_page += rows_read; |
308 | 384 | _current_ordinal += rows_read; |
309 | 384 | remaining -= rows_read; |
310 | 384 | } |
311 | 354 | *n -= remaining; |
312 | 354 | _seeked = false; |
313 | 354 | return Status::OK(); |
314 | 354 | } |
315 | | |
316 | | } // namespace segment_v2 |
317 | | } // namespace doris |