Coverage Report

Created: 2026-06-02 02:02

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/index/indexed_column_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/index/indexed_column_reader.h"
19
20
#include <gen_cpp/segment_v2.pb.h>
21
22
#include <algorithm>
23
24
#include "common/status.h"
25
#include "io/io_common.h"
26
#include "storage/key_coder.h"
27
#include "storage/olap_common.h"
28
#include "storage/segment/encoding_info.h" // for EncodingInfo
29
#include "storage/segment/options.h"
30
#include "storage/segment/page_decoder.h"
31
#include "storage/segment/page_io.h"
32
#include "storage/types.h"
33
#include "util/block_compression.h"
34
#include "util/bvar_helper.h"
35
36
namespace doris {
37
using namespace ErrorCode;
38
namespace segment_v2 {
39
40
static bvar::Adder<uint64_t> g_index_reader_bytes("doris_pk", "index_reader_bytes");
41
static bvar::Adder<uint64_t> g_index_reader_compressed_bytes("doris_pk",
42
                                                             "index_reader_compressed_bytes");
43
static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_bytes_per_second(
44
        "doris_pk", "index_reader_bytes_per_second", &g_index_reader_bytes, 60);
45
static bvar::Adder<uint64_t> g_index_reader_pages("doris_pk", "index_reader_pages");
46
static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_pages_per_second(
47
        "doris_pk", "index_reader_pages_per_second", &g_index_reader_pages, 60);
48
static bvar::Adder<uint64_t> g_index_reader_cached_pages("doris_pk", "index_reader_cached_pages");
49
static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_cached_pages_per_second(
50
        "doris_pk", "index_reader_cached_pages_per_second", &g_index_reader_cached_pages, 60);
51
static bvar::Adder<uint64_t> g_index_reader_seek_count("doris_pk", "index_reader_seek_count");
52
static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_seek_per_second(
53
        "doris_pk", "index_reader_seek_per_second", &g_index_reader_seek_count, 60);
54
static bvar::Adder<uint64_t> g_index_reader_pk_pages("doris_pk", "index_reader_pk_pages");
55
static bvar::PerSecond<bvar::Adder<uint64_t>> g_index_reader_pk_bytes_per_second(
56
        "doris_pk", "index_reader_pk_pages_per_second", &g_index_reader_pk_pages, 60);
57
58
185
int64_t IndexedColumnReader::get_metadata_size() const {
59
185
    return sizeof(IndexedColumnReader) + _meta.ByteSizeLong();
60
185
}
61
62
Status IndexedColumnReader::load(bool use_page_cache, bool kept_in_memory,
63
185
                                 OlapReaderStatistics* index_load_stats) {
64
185
    _use_page_cache = use_page_cache;
65
185
    _kept_in_memory = kept_in_memory;
66
67
185
    _type = (FieldType)_meta.data_type();
68
185
    if (!is_scalar_type(_type)) {
69
0
        return Status::NotSupported("unsupported typeinfo, type={}", _meta.data_type());
70
0
    }
71
185
    RETURN_IF_ERROR(EncodingInfo::get(_type, _meta.encoding(), &_encoding_info));
72
185
    _value_key_coder = get_key_coder(_type);
73
74
    // read and parse ordinal index page when exists
75
185
    if (_meta.has_ordinal_index_meta()) {
76
185
        if (_meta.ordinal_index_meta().is_root_data_page()) {
77
164
            _sole_data_page = PagePointer(_meta.ordinal_index_meta().root_page());
78
164
        } else {
79
21
            RETURN_IF_ERROR(load_index_page(_meta.ordinal_index_meta().root_page(),
80
21
                                            &_ordinal_index_page_handle,
81
21
                                            _ordinal_index_reader.get(), index_load_stats));
82
21
            _has_index_page = true;
83
21
        }
84
185
    }
85
86
    // read and parse value index page when exists
87
185
    if (_meta.has_value_index_meta()) {
88
126
        if (_meta.value_index_meta().is_root_data_page()) {
89
105
            _sole_data_page = PagePointer(_meta.value_index_meta().root_page());
90
105
        } else {
91
21
            RETURN_IF_ERROR(load_index_page(_meta.value_index_meta().root_page(),
92
21
                                            &_value_index_page_handle, _value_index_reader.get(),
93
21
                                            index_load_stats));
94
21
            _has_index_page = true;
95
21
        }
96
126
    }
97
185
    _num_values = _meta.num_values();
98
99
185
    update_metadata_size();
100
185
    return Status::OK();
101
185
}
102
103
Status IndexedColumnReader::load_index_page(const PagePointerPB& pp, PageHandle* handle,
104
                                            IndexPageReader* reader,
105
42
                                            OlapReaderStatistics* index_load_stats) {
106
42
    Slice body;
107
42
    PageFooterPB footer;
108
42
    BlockCompressionCodec* local_compress_codec;
109
42
    RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &local_compress_codec));
110
42
    RETURN_IF_ERROR(read_page(PagePointer(pp), handle, &body, &footer, INDEX_PAGE,
111
42
                              local_compress_codec, false, index_load_stats));
112
42
    RETURN_IF_ERROR(reader->parse(body, footer.index_page_footer()));
113
42
    _mem_size += body.get_size();
114
42
    return Status::OK();
115
42
}
116
117
Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle, Slice* body,
118
                                      PageFooterPB* footer, PageTypePB type,
119
                                      BlockCompressionCodec* codec, bool pre_decode,
120
304
                                      OlapReaderStatistics* stats) const {
121
304
    OlapReaderStatistics tmp_stats;
122
304
    OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats;
123
304
    PageReadOptions opts(io::IOContext {.is_index_data = true,
124
304
                                        .file_cache_stats = &stats_ptr->file_cache_stats,
125
304
                                        .table_name = "",
126
304
                                        .partition_name = ""});
127
304
    opts.use_page_cache = _use_page_cache;
128
304
    opts.kept_in_memory = _kept_in_memory;
129
304
    opts.pre_decode = pre_decode;
130
304
    opts.type = type;
131
304
    opts.file_reader = _file_reader.get();
132
304
    opts.page_pointer = pp;
133
304
    opts.codec = codec;
134
304
    opts.stats = stats_ptr;
135
304
    opts.encoding_info = _encoding_info;
136
137
304
    if (_is_pk_index) {
138
176
        opts.type = PRIMARY_KEY_INDEX_PAGE;
139
176
    }
140
304
    auto st = PageIO::read_and_decompress_page(opts, handle, body, footer);
141
304
    g_index_reader_compressed_bytes << pp.size;
142
304
    g_index_reader_bytes << footer->uncompressed_size();
143
304
    g_index_reader_pages << 1;
144
304
    g_index_reader_cached_pages << tmp_stats.cached_pages_num;
145
304
    return st;
146
304
}
147
148
185
IndexedColumnReader::~IndexedColumnReader() = default;
149
150
///////////////////////////////////////////////////////////////////////////////
151
152
262
Status IndexedColumnIterator::_read_data_page(const PagePointer& pp) {
153
262
    Status status;
154
    // there is not init() for IndexedColumnIterator, so do it here
155
262
    if (!_compress_codec) {
156
191
        RETURN_IF_ERROR(get_block_compression_codec(_reader->get_compression(), &_compress_codec));
157
191
    }
158
159
262
    PageHandle handle;
160
262
    Slice body;
161
262
    PageFooterPB footer;
162
262
    RETURN_IF_ERROR(_reader->read_page(pp, &handle, &body, &footer, DATA_PAGE, _compress_codec,
163
262
                                       true, _stats));
164
    // parse data page
165
    // note that page_index is not used in IndexedColumnIterator, so we pass 0
166
262
    PageDecoderOptions opts;
167
262
    opts.need_check_bitmap = false;
168
262
    status = ParsedPage::create(std::move(handle), body, footer.data_page_footer(),
169
262
                                _reader->encoding_info(), pp, 0, &_data_page, opts);
170
262
    if (!status.ok()) {
171
0
        LOG(WARNING) << "failed to create ParsedPage in IndexedColumnIterator, file="
172
0
                     << _reader->_file_reader->path().native() << ", page_offset=" << pp.offset
173
0
                     << ", page_size=" << pp.size << ", error=" << status;
174
0
    }
175
262
    DCHECK(_reader->_meta.ordinal_index_meta().is_root_data_page()
176
262
                   ? _reader->_meta.num_values() == _data_page.num_rows
177
262
                   : true);
178
262
    return status;
179
262
}
180
181
358
Status IndexedColumnIterator::seek_to_ordinal(ordinal_t idx) {
182
358
    DCHECK(idx <= _reader->num_values());
183
184
358
    if (!_reader->support_ordinal_seek()) {
185
0
        return Status::NotSupported("no ordinal index");
186
0
    }
187
188
    // it's ok to seek past the last value
189
358
    if (idx == _reader->num_values()) {
190
1
        _current_ordinal = idx;
191
1
        _seeked = true;
192
1
        return Status::OK();
193
1
    }
194
195
357
    if (!_data_page || !_data_page.contains(idx)) {
196
        // need to read the data page containing row at idx
197
194
        if (_reader->_has_index_page) {
198
38
            std::string key;
199
38
            KeyCoderTraits<FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT>::full_encode_ascending(&idx,
200
38
                                                                                              &key);
201
38
            RETURN_IF_ERROR(_ordinal_iter.seek_at_or_before(key));
202
38
            RETURN_IF_ERROR(_read_data_page(_ordinal_iter.current_page_pointer()));
203
38
            _current_iter = &_ordinal_iter;
204
156
        } else {
205
156
            RETURN_IF_ERROR(_read_data_page(_reader->_sole_data_page));
206
156
        }
207
194
    }
208
209
357
    ordinal_t offset_in_page = idx - _data_page.first_ordinal;
210
357
    RETURN_IF_ERROR(_data_page.data_decoder->seek_to_position_in_page(offset_in_page));
211
357
    DCHECK(offset_in_page == _data_page.data_decoder->current_index());
212
357
    _data_page.offset_in_page = offset_in_page;
213
357
    _current_ordinal = idx;
214
357
    _seeked = true;
215
357
    return Status::OK();
216
357
}
217
218
5.23k
Status IndexedColumnIterator::seek_at_or_after(const void* key, bool* exact_match) {
219
5.23k
    if (!_reader->support_value_seek()) {
220
0
        return Status::NotSupported("no value index");
221
0
    }
222
223
5.23k
    if (_reader->num_values() == 0) {
224
0
        return Status::Error<ErrorCode::ENTRY_NOT_FOUND>("value index is empty ");
225
0
    }
226
227
5.23k
    g_index_reader_seek_count << 1;
228
229
5.23k
    bool load_data_page = false;
230
5.23k
    PagePointer data_page_pp;
231
5.23k
    if (_reader->_has_index_page) {
232
        // seek index to determine the data page to seek
233
102
        std::string encoded_key;
234
102
        _reader->_value_key_coder->full_encode_ascending(key, &encoded_key);
235
102
        Status st = _value_iter.seek_at_or_before(encoded_key);
236
102
        if (st.is<ENTRY_NOT_FOUND>()) {
237
            // all keys in page is greater than `encoded_key`, point to the first page.
238
            // otherwise, we may missing some pages.
239
            // For example, the predicate is `col1 > 2`, and the index page is [3,5,7].
240
            // so the `seek_at_or_before(2)` will return Status::Error<ENTRY_NOT_FOUND>().
241
            // But actually, we expect it to point to page `3`.
242
0
            _value_iter.seek_to_first();
243
102
        } else if (!st.ok()) {
244
0
            return st;
245
0
        }
246
102
        data_page_pp = _value_iter.current_page_pointer();
247
102
        _current_iter = &_value_iter;
248
102
        if (!_data_page || _data_page.page_pointer != data_page_pp) {
249
            // load when it's not the same with the current
250
24
            load_data_page = true;
251
24
        }
252
5.13k
    } else if (!_data_page) {
253
        // no index page, load data page for the first time
254
14
        load_data_page = true;
255
14
        data_page_pp = PagePointer(_reader->_sole_data_page);
256
14
    }
257
258
5.23k
    if (load_data_page) {
259
38
        RETURN_IF_ERROR(_read_data_page(data_page_pp));
260
38
    }
261
262
    // seek inside data page
263
5.23k
    Status st = _data_page.data_decoder->seek_at_or_after_value(key, exact_match);
264
    // return the first row of next page when not found
265
5.23k
    if (st.is<ENTRY_NOT_FOUND>() && _reader->_has_index_page) {
266
4
        if (_value_iter.has_next()) {
267
3
            _seeked = true;
268
3
            *exact_match = false;
269
3
            _current_ordinal = _data_page.first_ordinal + _data_page.num_rows;
270
            // move offset to the end of the page
271
3
            _data_page.offset_in_page = _data_page.num_rows;
272
3
            return Status::OK();
273
3
        }
274
4
    }
275
5.23k
    RETURN_IF_ERROR(st);
276
5.22k
    _data_page.offset_in_page = _data_page.data_decoder->current_index();
277
5.22k
    _current_ordinal = _data_page.first_ordinal + _data_page.offset_in_page;
278
5.22k
    DCHECK(_data_page.contains(_current_ordinal));
279
5.22k
    _seeked = true;
280
5.22k
    return Status::OK();
281
5.23k
}
282
283
358
Status IndexedColumnIterator::next_batch(size_t* n, MutableColumnPtr& dst) {
284
358
    DCHECK(_seeked);
285
358
    if (_current_ordinal == _reader->num_values()) {
286
0
        *n = 0;
287
0
        return Status::OK();
288
0
    }
289
290
358
    size_t remaining = *n;
291
746
    while (remaining > 0) {
292
388
        if (!_data_page.has_remaining()) {
293
            // trying to read next data page
294
30
            if (!_reader->_has_index_page) {
295
0
                break; // no more data page
296
0
            }
297
30
            bool has_next = _current_iter->move_next();
298
30
            if (!has_next) {
299
0
                break; // no more data page
300
0
            }
301
30
            RETURN_IF_ERROR(_read_data_page(_current_iter->current_page_pointer()));
302
30
        }
303
304
388
        size_t rows_to_read = std::min(_data_page.remaining(), remaining);
305
388
        size_t rows_read = rows_to_read;
306
388
        RETURN_IF_ERROR(_data_page.data_decoder->next_batch(&rows_read, dst));
307
388
        DCHECK(rows_to_read == rows_read);
308
309
388
        _data_page.offset_in_page += rows_read;
310
388
        _current_ordinal += rows_read;
311
388
        remaining -= rows_read;
312
388
    }
313
358
    *n -= remaining;
314
358
    _seeked = false;
315
358
    return Status::OK();
316
358
}
317
318
} // namespace segment_v2
319
} // namespace doris