Coverage Report

Created: 2026-04-07 13:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/segment_prefetcher.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/segment_prefetcher.h"
19
20
#include <algorithm>
21
#include <ranges>
22
23
#include "common/config.h"
24
#include "common/logging.h"
25
#include "storage/index/ordinal_page_index.h"
26
#include "storage/iterators.h"
27
#include "storage/segment/column_reader.h"
28
29
namespace doris::segment_v2 {
30
31
103k
void SegmentPrefetcher::add_rowids(const rowid_t* rowids, uint32_t num) {
32
103k
    if (ordinal_index == nullptr) {
33
0
        return;
34
0
    }
35
103k
    const auto& ordinals = ordinal_index->_ordinals; // ordinals[i] = first ordinal of page i
36
103k
    const auto& pages = ordinal_index->_pages;       // pages[i] = page pointer of page i
37
103k
    const int num_pages = ordinal_index->_num_pages;
38
232M
    for (uint32_t i = 0; i < num; ++i) {
39
232M
        rowid_t rowid = rowids[i];
40
41
232M
        if (_is_forward) {
42
231M
            while (page_idx < num_pages - 1 && ordinals[page_idx + 1] <= rowid) {
43
160k
                page_idx++;
44
160k
            }
45
46
231M
            const auto& page = pages[page_idx];
47
231M
            size_t page_start_block = _offset_to_block_id(page.offset);
48
231M
            size_t page_end_block = _offset_to_block_id(page.offset + page.size - 1);
49
50
            // If page spans two blocks, assign it to the next block (page_end_block)
51
231M
            size_t block_id =
52
231M
                    (page_start_block != page_end_block) ? page_end_block : page_start_block;
53
54
231M
            if (block_id != last_block_id) {
55
79.3k
                if (last_block_id != static_cast<size_t>(-1)) {
56
496
                    _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
57
496
                }
58
79.3k
                last_block_id = block_id;
59
79.3k
                current_block_first_rowid = rowid;
60
79.3k
            }
61
231M
        } else {
62
            // Backward reading: we need the last rowid in each block as the "first" rowid
63
            // (because when reading backwards, we encounter the largest rowid first)
64
            //
65
            // Strategy: iterate forward through bitmap, but for each block,
66
            // keep updating current_block_first_rowid to the latest (largest) rowid in that block
67
586k
            while (page_idx < num_pages - 1 && ordinals[page_idx + 1] <= rowid) {
68
100
                page_idx++;
69
100
            }
70
586k
            size_t block_id = _offset_to_block_id(pages[page_idx].offset);
71
72
586k
            if (block_id != last_block_id) {
73
493
                if (last_block_id != static_cast<size_t>(-1)) {
74
0
                    _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
75
0
                }
76
493
                last_block_id = block_id;
77
493
            }
78
586k
            current_block_first_rowid = rowid;
79
586k
        }
80
232M
    }
81
103k
}
82
83
776k
void SegmentPrefetcher::build_all_data_blocks() {
84
776k
    if (ordinal_index == nullptr) {
85
0
        return;
86
0
    }
87
776k
    reset_blocks();
88
776k
    const auto& ordinals = ordinal_index->_ordinals; // ordinals[i] = first ordinal of page i
89
776k
    const auto& pages = ordinal_index->_pages;       // pages[i] = page pointer of page i
90
776k
    const int num_pages = ordinal_index->_num_pages;
91
92
776k
    last_block_id = static_cast<size_t>(-1);
93
776k
    current_block_first_rowid = 0;
94
95
1.60M
    for (page_idx = 0; page_idx < num_pages; ++page_idx) {
96
825k
        const auto& page = pages[page_idx];
97
98
825k
        if (_is_forward) {
99
823k
            size_t page_start_block = _offset_to_block_id(page.offset);
100
823k
            size_t page_end_block = _offset_to_block_id(page.offset + page.size - 1);
101
102
            // If page spans two blocks, assign it to the next block (page_end_block)
103
823k
            size_t block_id =
104
823k
                    (page_start_block != page_end_block) ? page_end_block : page_start_block;
105
106
823k
            if (block_id != last_block_id) {
107
775k
                if (last_block_id != static_cast<size_t>(-1)) {
108
479
                    _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
109
479
                }
110
775k
                last_block_id = block_id;
111
775k
                current_block_first_rowid = static_cast<rowid_t>(ordinals[page_idx]);
112
775k
            }
113
823k
        } else {
114
            // Backward: use the last ordinal in each block as first_rowid
115
1.58k
            size_t block_id = _offset_to_block_id(page.offset);
116
1.58k
            if (block_id != last_block_id) {
117
887
                if (last_block_id != static_cast<size_t>(-1)) {
118
3
                    _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
119
3
                }
120
887
                last_block_id = block_id;
121
887
            }
122
1.58k
            current_block_first_rowid = static_cast<rowid_t>(ordinals[page_idx]);
123
1.58k
        }
124
825k
    }
125
126
    // Add the last block
127
776k
    if (last_block_id != static_cast<size_t>(-1)) {
128
776k
        _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
129
776k
    }
130
131
    // Reverse for backward reading
132
776k
    if (!_is_forward && !_block_sequence.empty()) {
133
881
        std::ranges::reverse(_block_sequence);
134
881
    }
135
776k
}
136
137
void SegmentPrefetcher::build_blocks_by_rowids(const roaring::Roaring& row_bitmap,
138
16.8k
                                               const std::vector<SegmentPrefetcher*>& prefetchers) {
139
79.2k
    for (auto* prefetcher : prefetchers) {
140
79.2k
        prefetcher->begin_build_blocks_by_rowids();
141
79.2k
    }
142
143
16.8k
    int batch_size = config::segment_file_cache_consume_rowids_batch_size;
144
16.8k
    std::vector<rowid_t> rowids(batch_size);
145
16.8k
    roaring::api::roaring_uint32_iterator_t iter;
146
16.8k
    roaring::api::roaring_init_iterator(&row_bitmap.roaring, &iter);
147
16.8k
    uint32_t num = roaring::api::roaring_read_uint32_iterator(&iter, rowids.data(), batch_size);
148
149
40.0k
    for (; num > 0;
150
23.1k
         num = roaring::api::roaring_read_uint32_iterator(&iter, rowids.data(), batch_size)) {
151
103k
        for (auto* prefetcher : prefetchers) {
152
103k
            prefetcher->add_rowids(rowids.data(), num);
153
103k
        }
154
23.1k
    }
155
156
79.3k
    for (auto* prefetcher : prefetchers) {
157
79.3k
        prefetcher->finish_build_blocks_by_rowids();
158
79.3k
    }
159
16.8k
}
160
161
79.2k
void SegmentPrefetcher::begin_build_blocks_by_rowids() {
162
79.2k
    reset_blocks();
163
79.2k
    page_idx = 0;
164
79.2k
}
165
166
79.2k
void SegmentPrefetcher::finish_build_blocks_by_rowids() {
167
79.2k
    if (ordinal_index == nullptr) {
168
0
        return;
169
0
    }
170
79.2k
    if (last_block_id != static_cast<size_t>(-1)) {
171
79.2k
        _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
172
79.2k
    }
173
174
79.2k
    if (!_is_forward && !_block_sequence.empty()) {
175
493
        std::ranges::reverse(_block_sequence);
176
493
    }
177
178
18.4E
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
179
18.4E
            "[verbose] SegmentPrefetcher initialized with block count={}, is_forward={}, "
180
18.4E
            "num_pages={}, path={}, blocks: (block_id, first_rowid)=[{}]",
181
18.4E
            _block_sequence.size(), _is_forward, ordinal_index->_num_pages, _path,
182
18.4E
            fmt::join(_block_sequence | std::views::transform([](const auto& b) {
183
0
                          return fmt::format("({}, {})", b.block_id, b.first_rowid);
184
0
                      }),
185
18.4E
                      ","));
186
79.2k
}
187
188
1.70M
void SegmentPrefetcher::reset_blocks() {
189
1.70M
    _block_sequence.clear();
190
1.70M
    _current_block_index = 0;
191
1.70M
    _prefetched_index = -1;
192
1.70M
}
193
194
Status SegmentPrefetcher::init(std::shared_ptr<ColumnReader> column_reader,
195
854k
                               const StorageReadOptions& read_options) {
196
854k
    DCHECK(column_reader != nullptr);
197
198
854k
    reset_blocks();
199
854k
    _is_forward = !read_options.read_orderby_key_reverse;
200
854k
    _path = column_reader->_file_reader->path().filename().native();
201
202
854k
    RETURN_IF_ERROR(column_reader->get_ordinal_index_reader(ordinal_index, read_options.stats));
203
854k
    return Status::OK();
204
854k
}
205
206
1.42M
bool SegmentPrefetcher::need_prefetch(rowid_t current_rowid, std::vector<BlockRange>* out_ranges) {
207
1.42M
    DCHECK(out_ranges != nullptr);
208
18.4E
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log)
209
18.4E
            << fmt::format("[verbose] SegmentPrefetcher need_prefetch enter current_rowid={}, {}",
210
18.4E
                           current_rowid, debug_string());
211
1.42M
    if (_block_sequence.empty() ||
212
1.42M
        _prefetched_index >= static_cast<int>(_block_sequence.size()) - 1) {
213
600k
        return false;
214
600k
    }
215
216
18.4E
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
217
18.4E
            "[verbose] SegmentPrefetcher need_prefetch called with current_rowid={}, {}, "
218
18.4E
            "block=(id={}, first_rowid={})",
219
18.4E
            current_rowid, debug_string(), _block_sequence[_current_block_index].block_id,
220
18.4E
            _block_sequence[_current_block_index].first_rowid);
221
825k
    if (_is_forward) {
222
825k
        while (_current_block_index + 1 < _block_sequence.size() &&
223
825k
               _block_sequence[_current_block_index + 1].first_rowid <= current_rowid) {
224
65
            _current_block_index++;
225
65
        }
226
18.4E
    } else {
227
18.4E
        while (_current_block_index + 1 < _block_sequence.size() &&
228
18.4E
               _block_sequence[_current_block_index + 1].first_rowid >= current_rowid) {
229
3
            _current_block_index++;
230
3
        }
231
18.4E
    }
232
233
821k
    out_ranges->clear();
234
    // for non-predicate column, some rowids in row_bitmap may be filtered out after vec evaluation of predicate columns,
235
    // so we should not prefetch for these rows
236
821k
    _prefetched_index = std::max(_prefetched_index, _current_block_index - 1);
237
1.64M
    while (_prefetched_index + 1 < _block_sequence.size() &&
238
1.64M
           window_size() < _config.prefetch_window_size) {
239
826k
        out_ranges->push_back(_block_id_to_range(_block_sequence[++_prefetched_index].block_id));
240
826k
    }
241
242
18.4E
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
243
18.4E
            "[verbose] SegmentPrefetcher need_prefetch after calc with current_rowid={}, {}, "
244
18.4E
            "block=(id={}, first_rowid={})",
245
18.4E
            current_rowid, debug_string(), _block_sequence[_current_block_index].block_id,
246
18.4E
            _block_sequence[_current_block_index].first_rowid);
247
248
821k
    bool triggered = !out_ranges->empty();
249
825k
    if (triggered) {
250
825k
        LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
251
42
                "[verbose] SegmentPrefetcher prefetch triggered at rowid={}, {}, prefetch {} "
252
42
                "blocks: (offset, size)=[{}]",
253
42
                current_rowid, debug_string(), out_ranges->size(),
254
42
                fmt::join(*out_ranges | std::views::transform([](const auto& b) {
255
0
                    return fmt::format("({}, {})", b.offset, b.size);
256
0
                }),
257
42
                          ","));
258
825k
    }
259
821k
    return triggered;
260
1.42M
}
261
262
} // namespace doris::segment_v2