Coverage Report

Created: 2026-03-19 11:17

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/segment_prefetcher.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/segment_prefetcher.h"
19
20
#include <algorithm>
21
#include <ranges>
22
23
#include "common/config.h"
24
#include "common/logging.h"
25
#include "storage/index/ordinal_page_index.h"
26
#include "storage/iterators.h"
27
#include "storage/segment/column_reader.h"
28
29
namespace doris::segment_v2 {
30
31
100k
void SegmentPrefetcher::add_rowids(const rowid_t* rowids, uint32_t num) {
32
100k
    if (ordinal_index == nullptr) {
33
0
        return;
34
0
    }
35
100k
    const auto& ordinals = ordinal_index->_ordinals; // ordinals[i] = first ordinal of page i
36
100k
    const auto& pages = ordinal_index->_pages;       // pages[i] = page pointer of page i
37
100k
    const int num_pages = ordinal_index->_num_pages;
38
192M
    for (uint32_t i = 0; i < num; ++i) {
39
191M
        rowid_t rowid = rowids[i];
40
41
191M
        if (_is_forward) {
42
191M
            while (page_idx < num_pages - 1 && ordinals[page_idx + 1] <= rowid) {
43
133k
                page_idx++;
44
133k
            }
45
46
191M
            const auto& page = pages[page_idx];
47
191M
            size_t page_start_block = _offset_to_block_id(page.offset);
48
191M
            size_t page_end_block = _offset_to_block_id(page.offset + page.size - 1);
49
50
            // If page spans two blocks, assign it to the next block (page_end_block)
51
191M
            size_t block_id =
52
191M
                    (page_start_block != page_end_block) ? page_end_block : page_start_block;
53
54
191M
            if (block_id != last_block_id) {
55
69.5k
                if (last_block_id != static_cast<size_t>(-1)) {
56
445
                    _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
57
445
                }
58
69.5k
                last_block_id = block_id;
59
69.5k
                current_block_first_rowid = rowid;
60
69.5k
            }
61
191M
        } else {
62
            // Backward reading: we need the last rowid in each block as the "first" rowid
63
            // (because when reading backwards, we encounter the largest rowid first)
64
            //
65
            // Strategy: iterate forward through bitmap, but for each block,
66
            // keep updating current_block_first_rowid to the latest (largest) rowid in that block
67
311k
            while (page_idx < num_pages - 1 && ordinals[page_idx + 1] <= rowid) {
68
297
                page_idx++;
69
297
            }
70
311k
            size_t block_id = _offset_to_block_id(pages[page_idx].offset);
71
72
311k
            if (block_id != last_block_id) {
73
371
                if (last_block_id != static_cast<size_t>(-1)) {
74
0
                    _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
75
0
                }
76
371
                last_block_id = block_id;
77
371
            }
78
311k
            current_block_first_rowid = rowid;
79
311k
        }
80
191M
    }
81
100k
}
82
83
822k
void SegmentPrefetcher::build_all_data_blocks() {
84
822k
    if (ordinal_index == nullptr) {
85
0
        return;
86
0
    }
87
822k
    reset_blocks();
88
822k
    const auto& ordinals = ordinal_index->_ordinals; // ordinals[i] = first ordinal of page i
89
822k
    const auto& pages = ordinal_index->_pages;       // pages[i] = page pointer of page i
90
822k
    const int num_pages = ordinal_index->_num_pages;
91
92
822k
    last_block_id = static_cast<size_t>(-1);
93
822k
    current_block_first_rowid = 0;
94
95
1.69M
    for (page_idx = 0; page_idx < num_pages; ++page_idx) {
96
873k
        const auto& page = pages[page_idx];
97
98
873k
        if (_is_forward) {
99
872k
            size_t page_start_block = _offset_to_block_id(page.offset);
100
872k
            size_t page_end_block = _offset_to_block_id(page.offset + page.size - 1);
101
102
            // If page spans two blocks, assign it to the next block (page_end_block)
103
872k
            size_t block_id =
104
872k
                    (page_start_block != page_end_block) ? page_end_block : page_start_block;
105
106
872k
            if (block_id != last_block_id) {
107
823k
                if (last_block_id != static_cast<size_t>(-1)) {
108
631
                    _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
109
631
                }
110
823k
                last_block_id = block_id;
111
823k
                current_block_first_rowid = static_cast<rowid_t>(ordinals[page_idx]);
112
823k
            }
113
872k
        } else {
114
            // Backward: use the last ordinal in each block as first_rowid
115
396
            size_t block_id = _offset_to_block_id(page.offset);
116
661
            if (block_id != last_block_id) {
117
661
                if (last_block_id != static_cast<size_t>(-1)) {
118
0
                    _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
119
0
                }
120
661
                last_block_id = block_id;
121
661
            }
122
396
            current_block_first_rowid = static_cast<rowid_t>(ordinals[page_idx]);
123
396
        }
124
873k
    }
125
126
    // Add the last block
127
823k
    if (last_block_id != static_cast<size_t>(-1)) {
128
823k
        _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
129
823k
    }
130
131
    // Reverse for backward reading
132
822k
    if (!_is_forward && !_block_sequence.empty()) {
133
661
        std::ranges::reverse(_block_sequence);
134
661
    }
135
822k
}
136
137
void SegmentPrefetcher::build_blocks_by_rowids(const roaring::Roaring& row_bitmap,
138
15.5k
                                               const std::vector<SegmentPrefetcher*>& prefetchers) {
139
69.3k
    for (auto* prefetcher : prefetchers) {
140
69.3k
        prefetcher->begin_build_blocks_by_rowids();
141
69.3k
    }
142
143
15.5k
    int batch_size = config::segment_file_cache_consume_rowids_batch_size;
144
15.5k
    std::vector<rowid_t> rowids(batch_size);
145
15.5k
    roaring::api::roaring_uint32_iterator_t iter;
146
15.5k
    roaring::api::roaring_init_iterator(&row_bitmap.roaring, &iter);
147
15.5k
    uint32_t num = roaring::api::roaring_read_uint32_iterator(&iter, rowids.data(), batch_size);
148
149
37.2k
    for (; num > 0;
150
21.6k
         num = roaring::api::roaring_read_uint32_iterator(&iter, rowids.data(), batch_size)) {
151
100k
        for (auto* prefetcher : prefetchers) {
152
100k
            prefetcher->add_rowids(rowids.data(), num);
153
100k
        }
154
21.6k
    }
155
156
69.4k
    for (auto* prefetcher : prefetchers) {
157
69.4k
        prefetcher->finish_build_blocks_by_rowids();
158
69.4k
    }
159
15.5k
}
160
161
69.3k
void SegmentPrefetcher::begin_build_blocks_by_rowids() {
162
69.3k
    reset_blocks();
163
69.3k
    page_idx = 0;
164
69.3k
}
165
166
69.3k
void SegmentPrefetcher::finish_build_blocks_by_rowids() {
167
69.3k
    if (ordinal_index == nullptr) {
168
0
        return;
169
0
    }
170
69.3k
    if (last_block_id != static_cast<size_t>(-1)) {
171
69.3k
        _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
172
69.3k
    }
173
174
69.3k
    if (!_is_forward && !_block_sequence.empty()) {
175
371
        std::ranges::reverse(_block_sequence);
176
371
    }
177
178
18.4E
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
179
18.4E
            "[verbose] SegmentPrefetcher initialized with block count={}, is_forward={}, "
180
18.4E
            "num_pages={}, path={}, blocks: (block_id, first_rowid)=[{}]",
181
18.4E
            _block_sequence.size(), _is_forward, ordinal_index->_num_pages, _path,
182
18.4E
            fmt::join(_block_sequence | std::views::transform([](const auto& b) {
183
0
                          return fmt::format("({}, {})", b.block_id, b.first_rowid);
184
0
                      }),
185
18.4E
                      ","));
186
69.3k
}
187
188
1.78M
void SegmentPrefetcher::reset_blocks() {
189
1.78M
    _block_sequence.clear();
190
1.78M
    _current_block_index = 0;
191
1.78M
    _prefetched_index = -1;
192
1.78M
}
193
194
Status SegmentPrefetcher::init(std::shared_ptr<ColumnReader> column_reader,
195
891k
                               const StorageReadOptions& read_options) {
196
891k
    DCHECK(column_reader != nullptr);
197
198
891k
    reset_blocks();
199
891k
    _is_forward = !read_options.read_orderby_key_reverse;
200
891k
    _path = column_reader->_file_reader->path().filename().native();
201
202
891k
    RETURN_IF_ERROR(column_reader->get_ordinal_index_reader(ordinal_index, read_options.stats));
203
891k
    return Status::OK();
204
891k
}
205
206
1.52M
bool SegmentPrefetcher::need_prefetch(rowid_t current_rowid, std::vector<BlockRange>* out_ranges) {
207
1.52M
    DCHECK(out_ranges != nullptr);
208
18.4E
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log)
209
18.4E
            << fmt::format("[verbose] SegmentPrefetcher need_prefetch enter current_rowid={}, {}",
210
18.4E
                           current_rowid, debug_string());
211
1.52M
    if (_block_sequence.empty() ||
212
1.52M
        _prefetched_index >= static_cast<int>(_block_sequence.size()) - 1) {
213
659k
        return false;
214
659k
    }
215
216
18.4E
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
217
18.4E
            "[verbose] SegmentPrefetcher need_prefetch called with current_rowid={}, {}, "
218
18.4E
            "block=(id={}, first_rowid={})",
219
18.4E
            current_rowid, debug_string(), _block_sequence[_current_block_index].block_id,
220
18.4E
            _block_sequence[_current_block_index].first_rowid);
221
865k
    if (_is_forward) {
222
865k
        while (_current_block_index + 1 < _block_sequence.size() &&
223
865k
               _block_sequence[_current_block_index + 1].first_rowid <= current_rowid) {
224
77
            _current_block_index++;
225
77
        }
226
18.4E
    } else {
227
18.4E
        while (_current_block_index + 1 < _block_sequence.size() &&
228
18.4E
               _block_sequence[_current_block_index + 1].first_rowid >= current_rowid) {
229
0
            _current_block_index++;
230
0
        }
231
18.4E
    }
232
233
864k
    out_ranges->clear();
234
    // for non-predicate column, some rowids in row_bitmap may be filtered out after vec evaluation of predicate columns,
235
    // so we should not prefetch for these rows
236
864k
    _prefetched_index = std::max(_prefetched_index, _current_block_index - 1);
237
1.73M
    while (_prefetched_index + 1 < _block_sequence.size() &&
238
1.73M
           window_size() < _config.prefetch_window_size) {
239
866k
        out_ranges->push_back(_block_id_to_range(_block_sequence[++_prefetched_index].block_id));
240
866k
    }
241
242
18.4E
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
243
18.4E
            "[verbose] SegmentPrefetcher need_prefetch after calc with current_rowid={}, {}, "
244
18.4E
            "block=(id={}, first_rowid={})",
245
18.4E
            current_rowid, debug_string(), _block_sequence[_current_block_index].block_id,
246
18.4E
            _block_sequence[_current_block_index].first_rowid);
247
248
864k
    bool triggered = !out_ranges->empty();
249
865k
    if (triggered) {
250
865k
        LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
251
134
                "[verbose] SegmentPrefetcher prefetch triggered at rowid={}, {}, prefetch {} "
252
134
                "blocks: (offset, size)=[{}]",
253
134
                current_rowid, debug_string(), out_ranges->size(),
254
134
                fmt::join(*out_ranges | std::views::transform([](const auto& b) {
255
0
                    return fmt::format("({}, {})", b.offset, b.size);
256
0
                }),
257
134
                          ","));
258
865k
    }
259
864k
    return triggered;
260
1.52M
}
261
262
} // namespace doris::segment_v2