Coverage Report

Created: 2026-03-24 20:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/index/indexed_column_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/index/indexed_column_reader.h"
19
20
#include <gen_cpp/segment_v2.pb.h>
21
22
#include <algorithm>
23
24
#include "common/status.h"
25
#include "io/io_common.h"
26
#include "storage/key_coder.h"
27
#include "storage/olap_common.h"
28
#include "storage/segment/encoding_info.h" // for EncodingInfo
29
#include "storage/segment/options.h"
30
#include "storage/segment/page_decoder.h"
31
#include "storage/segment/page_io.h"
32
#include "storage/types.h"
33
#include "util/block_compression.h"
34
#include "util/bvar_helper.h"
35
36
namespace doris {
37
using namespace ErrorCode;
38
namespace segment_v2 {
39
40
static bvar::Adder<uint64_t> g_index_reader_bytes("doris_pk", "index_reader_bytes");
41
static bvar::Adder<uint64_t> g_index_reader_compressed_bytes("doris_pk",
42
                                                             "index_reader_compressed_bytes");
43
static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_bytes_per_second(
44
        "doris_pk", "index_reader_bytes_per_second", &g_index_reader_bytes, 60);
45
static bvar::Adder<uint64_t> g_index_reader_pages("doris_pk", "index_reader_pages");
46
static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_pages_per_second(
47
        "doris_pk", "index_reader_pages_per_second", &g_index_reader_pages, 60);
48
static bvar::Adder<uint64_t> g_index_reader_cached_pages("doris_pk", "index_reader_cached_pages");
49
static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_cached_pages_per_second(
50
        "doris_pk", "index_reader_cached_pages_per_second", &g_index_reader_cached_pages, 60);
51
static bvar::Adder<uint64_t> g_index_reader_seek_count("doris_pk", "index_reader_seek_count");
52
static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_seek_per_second(
53
        "doris_pk", "index_reader_seek_per_second", &g_index_reader_seek_count, 60);
54
static bvar::Adder<uint64_t> g_index_reader_pk_pages("doris_pk", "index_reader_pk_pages");
55
static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_pk_bytes_per_second(
56
        "doris_pk", "index_reader_pk_pages_per_second", &g_index_reader_pk_pages, 60);
57
58
370
int64_t IndexedColumnReader::get_metadata_size() const {
59
370
    return sizeof(IndexedColumnReader) + _meta.ByteSizeLong();
60
370
}
61
62
Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory,
63
370
                                 OlapReaderStatistics* index_load_stats) {
64
370
    _use_page_cache = use_page_cache;
65
370
    _kept_in_memory = kept_in_memory;
66
67
370
    _type_info = get_scalar_type_info((FieldType)_meta.data_type());
68
370
    if (_type_info == nullptr) {
69
0
        return Status::NotSupported("unsupported typeinfo, type={}", _meta.data_type());
70
0
    }
71
370
    RETURN_IF_ERROR(EncodingInfo::get(_type_info->type(), _meta.encoding(), {}, &_encoding_info));
72
370
    _value_key_coder = get_key_coder(_type_info->type());
73
74
    // read and parse ordinal index page when exists
75
370
    if (_meta.has_ordinal_index_meta()) {
76
370
        if (_meta.ordinal_index_meta().is_root_data_page()) {
77
328
            _sole_data_page = PagePointer(_meta.ordinal_index_meta().root_page());
78
328
        } else {
79
42
            RETURN_IF_ERROR(load_index_page(_meta.ordinal_index_meta().root_page(),
80
42
                                            &_ordinal_index_page_handle,
81
42
                                            _ordinal_index_reader.get(), index_load_stats));
82
42
            _has_index_page = true;
83
42
        }
84
370
    }
85
86
    // read and parse value index page when exists
87
370
    if (_meta.has_value_index_meta()) {
88
252
        if (_meta.value_index_meta().is_root_data_page()) {
89
210
            _sole_data_page = PagePointer(_meta.value_index_meta().root_page());
90
210
        } else {
91
42
            RETURN_IF_ERROR(load_index_page(_meta.value_index_meta().root_page(),
92
42
                                            &_value_index_page_handle, _value_index_reader.get(),
93
42
                                            index_load_stats));
94
42
            _has_index_page = true;
95
42
        }
96
252
    }
97
370
    _num_values = _meta.num_values();
98
99
370
    update_metadata_size();
100
370
    return Status::OK();
101
370
}
102
103
Status IndexedColumnReader::load_index_page(const PagePointerPB& pp, PageHandle* handle,
104
                                            IndexPageReader* reader,
105
84
                                            OlapReaderStatistics* index_load_stats) {
106
84
    Slice body;
107
84
    PageFooterPB footer;
108
84
    BlockCompressionCodec* local_compress_codec;
109
84
    RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &local_compress_codec));
110
84
    RETURN_IF_ERROR(read_page(PagePointer(pp), handle, &body, &footer, INDEX_PAGE,
111
84
                              local_compress_codec, false, index_load_stats));
112
84
    RETURN_IF_ERROR(reader->parse(body, footer.index_page_footer()));
113
84
    _mem_size += body.get_size();
114
84
    return Status::OK();
115
84
}
116
117
Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle, Slice* body,
118
                                      PageFooterPB* footer, PageTypePB type,
119
                                      BlockCompressionCodec* codec, bool pre_decode,
120
608
                                      OlapReaderStatistics* stats) const {
121
608
    OlapReaderStatistics tmp_stats;
122
608
    OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats;
123
608
    PageReadOptions opts(io::IOContext {.is_index_data = true,
124
608
                                        .file_cache_stats = &stats_ptr->file_cache_stats});
125
608
    opts.use_page_cache = _use_page_cache;
126
608
    opts.kept_in_memory = _kept_in_memory;
127
608
    opts.pre_decode = pre_decode;
128
608
    opts.type = type;
129
608
    opts.file_reader = _file_reader.get();
130
608
    opts.page_pointer = pp;
131
608
    opts.codec = codec;
132
608
    opts.stats = stats_ptr;
133
608
    opts.encoding_info = _encoding_info;
134
135
608
    if (_is_pk_index) {
136
352
        opts.type = PRIMARY_KEY_INDEX_PAGE;
137
352
    }
138
608
    auto st = PageIO::read_and_decompress_page(opts, handle, body, footer);
139
608
    g_index_reader_compressed_bytes << pp.size;
140
608
    g_index_reader_bytes << footer->uncompressed_size();
141
608
    g_index_reader_pages << 1;
142
608
    g_index_reader_cached_pages << tmp_stats.cached_pages_num;
143
608
    return st;
144
608
}
145
146
370
IndexedColumnReader::~IndexedColumnReader() = default;
147
148
///////////////////////////////////////////////////////////////////////////////
149
150
524
Status IndexedColumnIterator::_read_data_page(const PagePointer& pp) {
151
524
    Status status;
152
    // there is not init() for IndexedColumnIterator, so do it here
153
524
    if (!_compress_codec) {
154
382
        RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), &_compress_codec));
155
382
    }
156
157
524
    PageHandle handle;
158
524
    Slice body;
159
524
    PageFooterPB footer;
160
524
    RETURN_IF_ERROR(_reader->read_page(pp, &handle, &body, &footer, DATA_PAGE, _compress_codec,
161
524
                                       true, _stats));
162
    // parse data page
163
    // note that page_index is not used in IndexedColumnIterator, so we pass 0
164
524
    PageDecoderOptions opts;
165
524
    opts.need_check_bitmap = false;
166
524
    status = ParsedPage::create(std::move(handle), body, footer.data_page_footer(),
167
524
                                _reader->encoding_info(), pp, 0, &_data_page, opts);
168
524
    if (!status.ok()) {
169
0
        LOG(WARNING) << "failed to create ParsedPage in IndexedColumnIterator, file="
170
0
                     << _reader->_file_reader->path().native() << ", page_offset=" << pp.offset
171
0
                     << ", page_size=" << pp.size << ", error=" << status;
172
0
    }
173
524
    DCHECK(_reader->_meta.ordinal_index_meta().is_root_data_page()
174
524
                   ? _reader->_meta.num_values() == _data_page.num_rows
175
524
                   : true);
176
524
    return status;
177
524
}
178
179
716
Status IndexedColumnIterator::seek_to_ordinal(ordinal_t idx) {
180
716
    DCHECK(idx <= _reader->num_values());
181
182
716
    if (!_reader->support_ordinal_seek()) {
183
0
        return Status::NotSupported("no ordinal index");
184
0
    }
185
186
    // it's ok to seek past the last value
187
716
    if (idx == _reader->num_values()) {
188
2
        _current_ordinal = idx;
189
2
        _seeked = true;
190
2
        return Status::OK();
191
2
    }
192
193
714
    if (!_data_page || !_data_page.contains(idx)) {
194
        // need to read the data page containing row at idx
195
388
        if (_reader->_has_index_page) {
196
76
            std::string key;
197
76
            KeyCoderTraits<FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT>::full_encode_ascending(&idx,
198
76
                                                                                              &key);
199
76
            RETURN_IF_ERROR(_ordinal_iter.seek_at_or_before(key));
200
76
            RETURN_IF_ERROR(_read_data_page(_ordinal_iter.current_page_pointer()));
201
76
            _current_iter = &_ordinal_iter;
202
312
        } else {
203
312
            RETURN_IF_ERROR(_read_data_page(_reader->_sole_data_page));
204
312
        }
205
388
    }
206
207
714
    ordinal_t offset_in_page = idx - _data_page.first_ordinal;
208
714
    RETURN_IF_ERROR(_data_page.data_decoder->seek_to_position_in_page(offset_in_page));
209
714
    DCHECK(offset_in_page == _data_page.data_decoder->current_index());
210
714
    _data_page.offset_in_page = offset_in_page;
211
714
    _current_ordinal = idx;
212
714
    _seeked = true;
213
714
    return Status::OK();
214
714
}
215
216
10.4k
Status IndexedColumnIterator::seek_at_or_after(const void* key, bool* exact_match) {
217
10.4k
    if (!_reader->support_value_seek()) {
218
0
        return Status::NotSupported("no value index");
219
0
    }
220
221
10.4k
    if (_reader->num_values() == 0) {
222
0
        return Status::Error<ErrorCode::ENTRY_NOT_FOUND>("value index is empty ");
223
0
    }
224
225
10.4k
    g_index_reader_seek_count << 1;
226
227
10.4k
    bool load_data_page = false;
228
10.4k
    PagePointer data_page_pp;
229
10.4k
    if (_reader->_has_index_page) {
230
        // seek index to determine the data page to seek
231
204
        std::string encoded_key;
232
204
        _reader->_value_key_coder->full_encode_ascending(key, &encoded_key);
233
204
        Status st = _value_iter.seek_at_or_before(encoded_key);
234
204
        if (st.is<ENTRY_NOT_FOUND>()) {
235
            // all keys in page is greater than `encoded_key`, point to the first page.
236
            // otherwise, we may missing some pages.
237
            // For example, the predicate is `col1 > 2`, and the index page is [3,5,7].
238
            // so the `seek_at_or_before(2)` will return Status::Error<ENTRY_NOT_FOUND>().
239
            // But actually, we expect it to point to page `3`.
240
0
            _value_iter.seek_to_first();
241
204
        } else if (!st.ok()) {
242
0
            return st;
243
0
        }
244
204
        data_page_pp = _value_iter.current_page_pointer();
245
204
        _current_iter = &_value_iter;
246
204
        if (!_data_page || _data_page.page_pointer != data_page_pp) {
247
            // load when it's not the same with the current
248
48
            load_data_page = true;
249
48
        }
250
10.2k
    } else if (!_data_page) {
251
        // no index page, load data page for the first time
252
28
        load_data_page = true;
253
28
        data_page_pp = PagePointer(_reader->_sole_data_page);
254
28
    }
255
256
10.4k
    if (load_data_page) {
257
76
        RETURN_IF_ERROR(_read_data_page(data_page_pp));
258
76
    }
259
260
    // seek inside data page
261
10.4k
    Status st = _data_page.data_decoder->seek_at_or_after_value(key, exact_match);
262
    // return the first row of next page when not found
263
10.4k
    if (st.is<ENTRY_NOT_FOUND>() && _reader->_has_index_page) {
264
8
        if (_value_iter.has_next()) {
265
6
            _seeked = true;
266
6
            *exact_match = false;
267
6
            _current_ordinal = _data_page.first_ordinal + _data_page.num_rows;
268
            // move offset to the end of the page
269
6
            _data_page.offset_in_page = _data_page.num_rows;
270
6
            return Status::OK();
271
6
        }
272
8
    }
273
10.4k
    RETURN_IF_ERROR(st);
274
10.4k
    _data_page.offset_in_page = _data_page.data_decoder->current_index();
275
10.4k
    _current_ordinal = _data_page.first_ordinal + _data_page.offset_in_page;
276
10.4k
    DCHECK(_data_page.contains(_current_ordinal));
277
10.4k
    _seeked = true;
278
10.4k
    return Status::OK();
279
10.4k
}
280
281
716
Status IndexedColumnIterator::next_batch(size_t* n, MutableColumnPtr& dst) {
282
716
    DCHECK(_seeked);
283
716
    if (_current_ordinal == _reader->num_values()) {
284
0
        *n = 0;
285
0
        return Status::OK();
286
0
    }
287
288
716
    size_t remaining = *n;
289
1.49k
    while (remaining > 0) {
290
776
        if (!_data_page.has_remaining()) {
291
            // trying to read next data page
292
60
            if (!_reader->_has_index_page) {
293
0
                break; // no more data page
294
0
            }
295
60
            bool has_next = _current_iter->move_next();
296
60
            if (!has_next) {
297
0
                break; // no more data page
298
0
            }
299
60
            RETURN_IF_ERROR(_read_data_page(_current_iter->current_page_pointer()));
300
60
        }
301
302
776
        size_t rows_to_read = std::min(_data_page.remaining(), remaining);
303
776
        size_t rows_read = rows_to_read;
304
776
        RETURN_IF_ERROR(_data_page.data_decoder->next_batch(&rows_read, dst));
305
776
        DCHECK(rows_to_read == rows_read);
306
307
776
        _data_page.offset_in_page += rows_read;
308
776
        _current_ordinal += rows_read;
309
776
        remaining -= rows_read;
310
776
    }
311
716
    *n -= remaining;
312
716
    _seeked = false;
313
716
    return Status::OK();
314
716
}
315
316
} // namespace segment_v2
317
} // namespace doris