Coverage Report

Created: 2026-03-15 18:33

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/segment_prefetcher.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/segment_prefetcher.h"
19
20
#include <algorithm>
21
#include <ranges>
22
23
#include "common/config.h"
24
#include "common/logging.h"
25
#include "storage/index/ordinal_page_index.h"
26
#include "storage/iterators.h"
27
#include "storage/segment/column_reader.h"
28
29
namespace doris::segment_v2 {
30
31
0
void SegmentPrefetcher::add_rowids(const rowid_t* rowids, uint32_t num) {
32
0
    if (ordinal_index == nullptr) {
33
0
        return;
34
0
    }
35
0
    const auto& ordinals = ordinal_index->_ordinals; // ordinals[i] = first ordinal of page i
36
0
    const auto& pages = ordinal_index->_pages;       // pages[i] = page pointer of page i
37
0
    const int num_pages = ordinal_index->_num_pages;
38
0
    for (uint32_t i = 0; i < num; ++i) {
39
0
        rowid_t rowid = rowids[i];
40
41
0
        if (_is_forward) {
42
0
            while (page_idx < num_pages - 1 && ordinals[page_idx + 1] <= rowid) {
43
0
                page_idx++;
44
0
            }
45
46
0
            const auto& page = pages[page_idx];
47
0
            size_t page_start_block = _offset_to_block_id(page.offset);
48
0
            size_t page_end_block = _offset_to_block_id(page.offset + page.size - 1);
49
50
            // If page spans two blocks, assign it to the next block (page_end_block)
51
0
            size_t block_id =
52
0
                    (page_start_block != page_end_block) ? page_end_block : page_start_block;
53
54
0
            if (block_id != last_block_id) {
55
0
                if (last_block_id != static_cast<size_t>(-1)) {
56
0
                    _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
57
0
                }
58
0
                last_block_id = block_id;
59
0
                current_block_first_rowid = rowid;
60
0
            }
61
0
        } else {
62
            // Backward reading: we need the last rowid in each block as the "first" rowid
63
            // (because when reading backwards, we encounter the largest rowid first)
64
            //
65
            // Strategy: iterate forward through bitmap, but for each block,
66
            // keep updating current_block_first_rowid to the latest (largest) rowid in that block
67
0
            while (page_idx < num_pages - 1 && ordinals[page_idx + 1] <= rowid) {
68
0
                page_idx++;
69
0
            }
70
0
            size_t block_id = _offset_to_block_id(pages[page_idx].offset);
71
72
0
            if (block_id != last_block_id) {
73
0
                if (last_block_id != static_cast<size_t>(-1)) {
74
0
                    _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
75
0
                }
76
0
                last_block_id = block_id;
77
0
            }
78
0
            current_block_first_rowid = rowid;
79
0
        }
80
0
    }
81
0
}
82
83
349k
void SegmentPrefetcher::build_all_data_blocks() {
84
349k
    if (ordinal_index == nullptr) {
85
0
        return;
86
0
    }
87
349k
    reset_blocks();
88
349k
    const auto& ordinals = ordinal_index->_ordinals; // ordinals[i] = first ordinal of page i
89
349k
    const auto& pages = ordinal_index->_pages;       // pages[i] = page pointer of page i
90
349k
    const int num_pages = ordinal_index->_num_pages;
91
92
349k
    last_block_id = static_cast<size_t>(-1);
93
349k
    current_block_first_rowid = 0;
94
95
708k
    for (page_idx = 0; page_idx < num_pages; ++page_idx) {
96
359k
        const auto& page = pages[page_idx];
97
98
359k
        if (_is_forward) {
99
359k
            size_t page_start_block = _offset_to_block_id(page.offset);
100
359k
            size_t page_end_block = _offset_to_block_id(page.offset + page.size - 1);
101
102
            // If page spans two blocks, assign it to the next block (page_end_block)
103
359k
            size_t block_id =
104
359k
                    (page_start_block != page_end_block) ? page_end_block : page_start_block;
105
106
359k
            if (block_id != last_block_id) {
107
349k
                if (last_block_id != static_cast<size_t>(-1)) {
108
40
                    _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
109
40
                }
110
349k
                last_block_id = block_id;
111
349k
                current_block_first_rowid = static_cast<rowid_t>(ordinals[page_idx]);
112
349k
            }
113
18.4E
        } else {
114
            // Backward: use the last ordinal in each block as first_rowid
115
18.4E
            size_t block_id = _offset_to_block_id(page.offset);
116
18.4E
            if (block_id != last_block_id) {
117
0
                if (last_block_id != static_cast<size_t>(-1)) {
118
0
                    _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
119
0
                }
120
0
                last_block_id = block_id;
121
0
            }
122
18.4E
            current_block_first_rowid = static_cast<rowid_t>(ordinals[page_idx]);
123
18.4E
        }
124
359k
    }
125
126
    // Add the last block
127
349k
    if (last_block_id != static_cast<size_t>(-1)) {
128
349k
        _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
129
349k
    }
130
131
    // Reverse for backward reading
132
349k
    if (!_is_forward && !_block_sequence.empty()) {
133
0
        std::ranges::reverse(_block_sequence);
134
0
    }
135
349k
}
136
137
void SegmentPrefetcher::build_blocks_by_rowids(const roaring::Roaring& row_bitmap,
138
0
                                               const std::vector<SegmentPrefetcher*>& prefetchers) {
139
0
    for (auto* prefetcher : prefetchers) {
140
0
        prefetcher->begin_build_blocks_by_rowids();
141
0
    }
142
143
0
    int batch_size = config::segment_file_cache_consume_rowids_batch_size;
144
0
    std::vector<rowid_t> rowids(batch_size);
145
0
    roaring::api::roaring_uint32_iterator_t iter;
146
0
    roaring::api::roaring_init_iterator(&row_bitmap.roaring, &iter);
147
0
    uint32_t num = roaring::api::roaring_read_uint32_iterator(&iter, rowids.data(), batch_size);
148
149
0
    for (; num > 0;
150
0
         num = roaring::api::roaring_read_uint32_iterator(&iter, rowids.data(), batch_size)) {
151
0
        for (auto* prefetcher : prefetchers) {
152
0
            prefetcher->add_rowids(rowids.data(), num);
153
0
        }
154
0
    }
155
156
0
    for (auto* prefetcher : prefetchers) {
157
0
        prefetcher->finish_build_blocks_by_rowids();
158
0
    }
159
0
}
160
161
0
void SegmentPrefetcher::begin_build_blocks_by_rowids() {
162
0
    reset_blocks();
163
0
    page_idx = 0;
164
0
}
165
166
0
void SegmentPrefetcher::finish_build_blocks_by_rowids() {
167
0
    if (ordinal_index == nullptr) {
168
0
        return;
169
0
    }
170
0
    if (last_block_id != static_cast<size_t>(-1)) {
171
0
        _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
172
0
    }
173
174
0
    if (!_is_forward && !_block_sequence.empty()) {
175
0
        std::ranges::reverse(_block_sequence);
176
0
    }
177
178
0
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
179
0
            "[verbose] SegmentPrefetcher initialized with block count={}, is_forward={}, "
180
0
            "num_pages={}, path={}, blocks: (block_id, first_rowid)=[{}]",
181
0
            _block_sequence.size(), _is_forward, ordinal_index->_num_pages, _path,
182
0
            fmt::join(_block_sequence | std::views::transform([](const auto& b) {
183
0
                          return fmt::format("({}, {})", b.block_id, b.first_rowid);
184
0
                      }),
185
0
                      ","));
186
0
}
187
188
698k
void SegmentPrefetcher::reset_blocks() {
189
698k
    _block_sequence.clear();
190
698k
    _current_block_index = 0;
191
698k
    _prefetched_index = -1;
192
698k
}
193
194
Status SegmentPrefetcher::init(std::shared_ptr<ColumnReader> column_reader,
195
349k
                               const StorageReadOptions& read_options) {
196
349k
    DCHECK(column_reader != nullptr);
197
198
349k
    reset_blocks();
199
349k
    _is_forward = !read_options.read_orderby_key_reverse;
200
349k
    _path = column_reader->_file_reader->path().filename().native();
201
202
349k
    RETURN_IF_ERROR(column_reader->get_ordinal_index_reader(ordinal_index, read_options.stats));
203
349k
    return Status::OK();
204
349k
}
205
206
524k
bool SegmentPrefetcher::need_prefetch(rowid_t current_rowid, std::vector<BlockRange>* out_ranges) {
207
524k
    DCHECK(out_ranges != nullptr);
208
18.4E
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log)
209
18.4E
            << fmt::format("[verbose] SegmentPrefetcher need_prefetch enter current_rowid={}, {}",
210
18.4E
                           current_rowid, debug_string());
211
524k
    if (_block_sequence.empty() ||
212
525k
        _prefetched_index >= static_cast<int>(_block_sequence.size()) - 1) {
213
175k
        return false;
214
175k
    }
215
216
18.4E
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
217
18.4E
            "[verbose] SegmentPrefetcher need_prefetch called with current_rowid={}, {}, "
218
18.4E
            "block=(id={}, first_rowid={})",
219
18.4E
            current_rowid, debug_string(), _block_sequence[_current_block_index].block_id,
220
18.4E
            _block_sequence[_current_block_index].first_rowid);
221
349k
    if (_is_forward) {
222
349k
        while (_current_block_index + 1 < _block_sequence.size() &&
223
349k
               _block_sequence[_current_block_index + 1].first_rowid <= current_rowid) {
224
0
            _current_block_index++;
225
0
        }
226
18.4E
    } else {
227
18.4E
        while (_current_block_index + 1 < _block_sequence.size() &&
228
18.4E
               _block_sequence[_current_block_index + 1].first_rowid >= current_rowid) {
229
0
            _current_block_index++;
230
0
        }
231
18.4E
    }
232
233
349k
    out_ranges->clear();
234
    // for non-predicate column, some rowids in row_bitmap may be filtered out after vec evaluation of predicate columns,
235
    // so we should not prefetch for these rows
236
349k
    _prefetched_index = std::max(_prefetched_index, _current_block_index - 1);
237
698k
    while (_prefetched_index + 1 < _block_sequence.size() &&
238
698k
           window_size() < _config.prefetch_window_size) {
239
349k
        out_ranges->push_back(_block_id_to_range(_block_sequence[++_prefetched_index].block_id));
240
349k
    }
241
242
18.4E
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
243
18.4E
            "[verbose] SegmentPrefetcher need_prefetch after calc with current_rowid={}, {}, "
244
18.4E
            "block=(id={}, first_rowid={})",
245
18.4E
            current_rowid, debug_string(), _block_sequence[_current_block_index].block_id,
246
18.4E
            _block_sequence[_current_block_index].first_rowid);
247
248
349k
    bool triggered = !out_ranges->empty();
249
349k
    if (triggered) {
250
349k
        LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
251
45
                "[verbose] SegmentPrefetcher prefetch triggered at rowid={}, {}, prefetch {} "
252
45
                "blocks: (offset, size)=[{}]",
253
45
                current_rowid, debug_string(), out_ranges->size(),
254
45
                fmt::join(*out_ranges | std::views::transform([](const auto& b) {
255
0
                    return fmt::format("({}, {})", b.offset, b.size);
256
0
                }),
257
45
                          ","));
258
349k
    }
259
349k
    return triggered;
260
524k
}
261
262
} // namespace doris::segment_v2