Coverage Report

Created: 2026-03-15 20:53

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/index/indexed_column_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/index/indexed_column_reader.h"
19
20
#include <gen_cpp/segment_v2.pb.h>
21
22
#include <algorithm>
23
24
#include "common/status.h"
25
#include "io/io_common.h"
26
#include "storage/key_coder.h"
27
#include "storage/olap_common.h"
28
#include "storage/segment/encoding_info.h" // for EncodingInfo
29
#include "storage/segment/options.h"
30
#include "storage/segment/page_decoder.h"
31
#include "storage/segment/page_io.h"
32
#include "storage/types.h"
33
#include "util/block_compression.h"
34
#include "util/bvar_helper.h"
35
36
namespace doris {
37
using namespace ErrorCode;
38
namespace segment_v2 {
39
40
static bvar::Adder<uint64_t> g_index_reader_bytes("doris_pk", "index_reader_bytes");
41
static bvar::Adder<uint64_t> g_index_reader_compressed_bytes("doris_pk",
42
                                                             "index_reader_compressed_bytes");
43
static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_bytes_per_second(
44
        "doris_pk", "index_reader_bytes_per_second", &g_index_reader_bytes, 60);
45
static bvar::Adder<uint64_t> g_index_reader_pages("doris_pk", "index_reader_pages");
46
static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_pages_per_second(
47
        "doris_pk", "index_reader_pages_per_second", &g_index_reader_pages, 60);
48
static bvar::Adder<uint64_t> g_index_reader_cached_pages("doris_pk", "index_reader_cached_pages");
49
static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_cached_pages_per_second(
50
        "doris_pk", "index_reader_cached_pages_per_second", &g_index_reader_cached_pages, 60);
51
static bvar::Adder<uint64_t> g_index_reader_seek_count("doris_pk", "index_reader_seek_count");
52
static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_seek_per_second(
53
        "doris_pk", "index_reader_seek_per_second", &g_index_reader_seek_count, 60);
54
static bvar::Adder<uint64_t> g_index_reader_pk_pages("doris_pk", "index_reader_pk_pages");
55
static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_pk_bytes_per_second(
56
        "doris_pk", "index_reader_pk_pages_per_second", &g_index_reader_pk_pages, 60);
57
58
181
int64_t IndexedColumnReader::get_metadata_size() const {
59
181
    return sizeof(IndexedColumnReader) + _meta.ByteSizeLong();
60
181
}
61
62
Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory,
63
181
                                 OlapReaderStatistics* index_load_stats) {
64
181
    _use_page_cache = use_page_cache;
65
181
    _kept_in_memory = kept_in_memory;
66
67
181
    _type_info = get_scalar_type_info((FieldType)_meta.data_type());
68
181
    if (_type_info == nullptr) {
69
0
        return Status::NotSupported("unsupported typeinfo, type={}", _meta.data_type());
70
0
    }
71
181
    RETURN_IF_ERROR(EncodingInfo::get(_type_info->type(), _meta.encoding(), {}, &_encoding_info));
72
181
    _value_key_coder = get_key_coder(_type_info->type());
73
74
    // read and parse ordinal index page when exists
75
181
    if (_meta.has_ordinal_index_meta()) {
76
181
        if (_meta.ordinal_index_meta().is_root_data_page()) {
77
160
            _sole_data_page = PagePointer(_meta.ordinal_index_meta().root_page());
78
160
        } else {
79
21
            RETURN_IF_ERROR(load_index_page(_meta.ordinal_index_meta().root_page(),
80
21
                                            &_ordinal_index_page_handle,
81
21
                                            _ordinal_index_reader.get(), index_load_stats));
82
21
            _has_index_page = true;
83
21
        }
84
181
    }
85
86
    // read and parse value index page when exists
87
181
    if (_meta.has_value_index_meta()) {
88
122
        if (_meta.value_index_meta().is_root_data_page()) {
89
101
            _sole_data_page = PagePointer(_meta.value_index_meta().root_page());
90
101
        } else {
91
21
            RETURN_IF_ERROR(load_index_page(_meta.value_index_meta().root_page(),
92
21
                                            &_value_index_page_handle, _value_index_reader.get(),
93
21
                                            index_load_stats));
94
21
            _has_index_page = true;
95
21
        }
96
122
    }
97
181
    _num_values = _meta.num_values();
98
99
181
    update_metadata_size();
100
181
    return Status::OK();
101
181
}
102
103
Status IndexedColumnReader::load_index_page(const PagePointerPB& pp, PageHandle* handle,
104
                                            IndexPageReader* reader,
105
42
                                            OlapReaderStatistics* index_load_stats) {
106
42
    Slice body;
107
42
    PageFooterPB footer;
108
42
    BlockCompressionCodec* local_compress_codec;
109
42
    RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &local_compress_codec));
110
42
    RETURN_IF_ERROR(read_page(PagePointer(pp), handle, &body, &footer, INDEX_PAGE,
111
42
                              local_compress_codec, false, index_load_stats));
112
42
    RETURN_IF_ERROR(reader->parse(body, footer.index_page_footer()));
113
42
    _mem_size += body.get_size();
114
42
    return Status::OK();
115
42
}
116
117
Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle, Slice* body,
118
                                      PageFooterPB* footer, PageTypePB type,
119
                                      BlockCompressionCodec* codec, bool pre_decode,
120
300
                                      OlapReaderStatistics* stats) const {
121
300
    OlapReaderStatistics tmp_stats;
122
300
    OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats;
123
300
    PageReadOptions opts(io::IOContext {.is_index_data = true,
124
300
                                        .file_cache_stats = &stats_ptr->file_cache_stats});
125
300
    opts.use_page_cache = _use_page_cache;
126
300
    opts.kept_in_memory = _kept_in_memory;
127
300
    opts.pre_decode = pre_decode;
128
300
    opts.type = type;
129
300
    opts.file_reader = _file_reader.get();
130
300
    opts.page_pointer = pp;
131
300
    opts.codec = codec;
132
300
    opts.stats = stats_ptr;
133
300
    opts.encoding_info = _encoding_info;
134
135
300
    if (_is_pk_index) {
136
176
        opts.type = PRIMARY_KEY_INDEX_PAGE;
137
176
    }
138
300
    auto st = PageIO::read_and_decompress_page(opts, handle, body, footer);
139
300
    g_index_reader_compressed_bytes << pp.size;
140
300
    g_index_reader_bytes << footer->uncompressed_size();
141
300
    g_index_reader_pages << 1;
142
300
    g_index_reader_cached_pages << tmp_stats.cached_pages_num;
143
300
    return st;
144
300
}
145
146
181
IndexedColumnReader::~IndexedColumnReader() = default;
147
148
///////////////////////////////////////////////////////////////////////////////
149
150
258
Status IndexedColumnIterator::_read_data_page(const PagePointer& pp) {
151
258
    Status status;
152
    // there is not init() for IndexedColumnIterator, so do it here
153
258
    if (!_compress_codec) {
154
187
        RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), &_compress_codec));
155
187
    }
156
157
258
    PageHandle handle;
158
258
    Slice body;
159
258
    PageFooterPB footer;
160
258
    RETURN_IF_ERROR(_reader->read_page(pp, &handle, &body, &footer, DATA_PAGE, _compress_codec,
161
258
                                       true, _stats));
162
    // parse data page
163
    // note that page_index is not used in IndexedColumnIterator, so we pass 0
164
258
    PageDecoderOptions opts;
165
258
    opts.need_check_bitmap = false;
166
258
    status = ParsedPage::create(std::move(handle), body, footer.data_page_footer(),
167
258
                                _reader->encoding_info(), pp, 0, &_data_page, opts);
168
258
    if (!status.ok()) {
169
0
        LOG(WARNING) << "failed to create ParsedPage in IndexedColumnIterator, file="
170
0
                     << _reader->_file_reader->path().native() << ", page_offset=" << pp.offset
171
0
                     << ", page_size=" << pp.size << ", error=" << status;
172
0
    }
173
258
    DCHECK(_reader->_meta.ordinal_index_meta().is_root_data_page()
174
258
                   ? _reader->_meta.num_values() == _data_page.num_rows
175
258
                   : true);
176
258
    return status;
177
258
}
178
179
354
Status IndexedColumnIterator::seek_to_ordinal(ordinal_t idx) {
180
354
    DCHECK(idx <= _reader->num_values());
181
182
354
    if (!_reader->support_ordinal_seek()) {
183
0
        return Status::NotSupported("no ordinal index");
184
0
    }
185
186
    // it's ok to seek past the last value
187
354
    if (idx == _reader->num_values()) {
188
1
        _current_ordinal = idx;
189
1
        _seeked = true;
190
1
        return Status::OK();
191
1
    }
192
193
353
    if (!_data_page || !_data_page.contains(idx)) {
194
        // need to read the data page containing row at idx
195
190
        if (_reader->_has_index_page) {
196
38
            std::string key;
197
38
            KeyCoderTraits<FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT>::full_encode_ascending(&idx,
198
38
                                                                                              &key);
199
38
            RETURN_IF_ERROR(_ordinal_iter.seek_at_or_before(key));
200
38
            RETURN_IF_ERROR(_read_data_page(_ordinal_iter.current_page_pointer()));
201
38
            _current_iter = &_ordinal_iter;
202
152
        } else {
203
152
            RETURN_IF_ERROR(_read_data_page(_reader->_sole_data_page));
204
152
        }
205
190
    }
206
207
353
    ordinal_t offset_in_page = idx - _data_page.first_ordinal;
208
353
    RETURN_IF_ERROR(_data_page.data_decoder->seek_to_position_in_page(offset_in_page));
209
353
    DCHECK(offset_in_page == _data_page.data_decoder->current_index());
210
353
    _data_page.offset_in_page = offset_in_page;
211
353
    _current_ordinal = idx;
212
353
    _seeked = true;
213
353
    return Status::OK();
214
353
}
215
216
5.23k
Status IndexedColumnIterator::seek_at_or_after(const void* key, bool* exact_match) {
217
5.23k
    if (!_reader->support_value_seek()) {
218
0
        return Status::NotSupported("no value index");
219
0
    }
220
221
5.23k
    if (_reader->num_values() == 0) {
222
0
        return Status::Error<ErrorCode::ENTRY_NOT_FOUND>("value index is empty ");
223
0
    }
224
225
5.23k
    g_index_reader_seek_count << 1;
226
227
5.23k
    bool load_data_page = false;
228
5.23k
    PagePointer data_page_pp;
229
5.23k
    if (_reader->_has_index_page) {
230
        // seek index to determine the data page to seek
231
102
        std::string encoded_key;
232
102
        _reader->_value_key_coder->full_encode_ascending(key, &encoded_key);
233
102
        Status st = _value_iter.seek_at_or_before(encoded_key);
234
102
        if (st.is<ENTRY_NOT_FOUND>()) {
235
            // all keys in page is greater than `encoded_key`, point to the first page.
236
            // otherwise, we may missing some pages.
237
            // For example, the predicate is `col1 > 2`, and the index page is [3,5,7].
238
            // so the `seek_at_or_before(2)` will return Status::Error<ENTRY_NOT_FOUND>().
239
            // But actually, we expect it to point to page `3`.
240
0
            _value_iter.seek_to_first();
241
102
        } else if (!st.ok()) {
242
0
            return st;
243
0
        }
244
102
        data_page_pp = _value_iter.current_page_pointer();
245
102
        _current_iter = &_value_iter;
246
102
        if (!_data_page || _data_page.page_pointer != data_page_pp) {
247
            // load when it's not the same with the current
248
24
            load_data_page = true;
249
24
        }
250
5.13k
    } else if (!_data_page) {
251
        // no index page, load data page for the first time
252
14
        load_data_page = true;
253
14
        data_page_pp = PagePointer(_reader->_sole_data_page);
254
14
    }
255
256
5.23k
    if (load_data_page) {
257
38
        RETURN_IF_ERROR(_read_data_page(data_page_pp));
258
38
    }
259
260
    // seek inside data page
261
5.23k
    Status st = _data_page.data_decoder->seek_at_or_after_value(key, exact_match);
262
    // return the first row of next page when not found
263
5.23k
    if (st.is<ENTRY_NOT_FOUND>() && _reader->_has_index_page) {
264
4
        if (_value_iter.has_next()) {
265
3
            _seeked = true;
266
3
            *exact_match = false;
267
3
            _current_ordinal = _data_page.first_ordinal + _data_page.num_rows;
268
            // move offset to the end of the page
269
3
            _data_page.offset_in_page = _data_page.num_rows;
270
3
            return Status::OK();
271
3
        }
272
4
    }
273
5.23k
    RETURN_IF_ERROR(st);
274
5.22k
    _data_page.offset_in_page = _data_page.data_decoder->current_index();
275
5.22k
    _current_ordinal = _data_page.first_ordinal + _data_page.offset_in_page;
276
5.22k
    DCHECK(_data_page.contains(_current_ordinal));
277
5.22k
    _seeked = true;
278
5.22k
    return Status::OK();
279
5.23k
}
280
281
354
Status IndexedColumnIterator::next_batch(size_t* n, MutableColumnPtr& dst) {
282
354
    DCHECK(_seeked);
283
354
    if (_current_ordinal == _reader->num_values()) {
284
0
        *n = 0;
285
0
        return Status::OK();
286
0
    }
287
288
354
    size_t remaining = *n;
289
738
    while (remaining > 0) {
290
384
        if (!_data_page.has_remaining()) {
291
            // trying to read next data page
292
30
            if (!_reader->_has_index_page) {
293
0
                break; // no more data page
294
0
            }
295
30
            bool has_next = _current_iter->move_next();
296
30
            if (!has_next) {
297
0
                break; // no more data page
298
0
            }
299
30
            RETURN_IF_ERROR(_read_data_page(_current_iter->current_page_pointer()));
300
30
        }
301
302
384
        size_t rows_to_read = std::min(_data_page.remaining(), remaining);
303
384
        size_t rows_read = rows_to_read;
304
384
        RETURN_IF_ERROR(_data_page.data_decoder->next_batch(&rows_read, dst));
305
384
        DCHECK(rows_to_read == rows_read);
306
307
384
        _data_page.offset_in_page += rows_read;
308
384
        _current_ordinal += rows_read;
309
384
        remaining -= rows_read;
310
384
    }
311
354
    *n -= remaining;
312
354
    _seeked = false;
313
354
    return Status::OK();
314
354
}
315
316
} // namespace segment_v2
317
} // namespace doris