Coverage Report

Created: 2026-03-27 16:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/segment_prefetcher.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/segment_prefetcher.h"
19
20
#include <algorithm>
21
#include <ranges>
22
23
#include "common/config.h"
24
#include "common/logging.h"
25
#include "storage/index/ordinal_page_index.h"
26
#include "storage/iterators.h"
27
#include "storage/segment/column_reader.h"
28
29
namespace doris::segment_v2 {
30
31
134k
void SegmentPrefetcher::add_rowids(const rowid_t* rowids, uint32_t num) {
32
134k
    if (ordinal_index == nullptr) {
33
0
        return;
34
0
    }
35
134k
    const auto& ordinals = ordinal_index->_ordinals; // ordinals[i] = first ordinal of page i
36
134k
    const auto& pages = ordinal_index->_pages;       // pages[i] = page pointer of page i
37
134k
    const int num_pages = ordinal_index->_num_pages;
38
242M
    for (uint32_t i = 0; i < num; ++i) {
39
241M
        rowid_t rowid = rowids[i];
40
41
241M
        if (_is_forward) {
42
239M
            while (page_idx < num_pages - 1 && ordinals[page_idx + 1] <= rowid) {
43
159k
                page_idx++;
44
159k
            }
45
46
239M
            const auto& page = pages[page_idx];
47
239M
            size_t page_start_block = _offset_to_block_id(page.offset);
48
239M
            size_t page_end_block = _offset_to_block_id(page.offset + page.size - 1);
49
50
            // If page spans two blocks, assign it to the next block (page_end_block)
51
239M
            size_t block_id =
52
239M
                    (page_start_block != page_end_block) ? page_end_block : page_start_block;
53
54
239M
            if (block_id != last_block_id) {
55
95.8k
                if (last_block_id != static_cast<size_t>(-1)) {
56
312
                    _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
57
312
                }
58
95.8k
                last_block_id = block_id;
59
95.8k
                current_block_first_rowid = rowid;
60
95.8k
            }
61
239M
        } else {
62
            // Backward reading: we need the last rowid in each block as the "first" rowid
63
            // (because when reading backwards, we encounter the largest rowid first)
64
            //
65
            // Strategy: iterate forward through bitmap, but for each block,
66
            // keep updating current_block_first_rowid to the latest (largest) rowid in that block
67
2.84M
            while (page_idx < num_pages - 1 && ordinals[page_idx + 1] <= rowid) {
68
304
                page_idx++;
69
304
            }
70
2.84M
            size_t block_id = _offset_to_block_id(pages[page_idx].offset);
71
72
2.84M
            if (block_id != last_block_id) {
73
2.76k
                if (last_block_id != static_cast<size_t>(-1)) {
74
0
                    _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
75
0
                }
76
2.76k
                last_block_id = block_id;
77
2.76k
            }
78
2.84M
            current_block_first_rowid = rowid;
79
2.84M
        }
80
241M
    }
81
134k
}
82
83
833k
void SegmentPrefetcher::build_all_data_blocks() {
84
833k
    if (ordinal_index == nullptr) {
85
0
        return;
86
0
    }
87
833k
    reset_blocks();
88
833k
    const auto& ordinals = ordinal_index->_ordinals; // ordinals[i] = first ordinal of page i
89
833k
    const auto& pages = ordinal_index->_pages;       // pages[i] = page pointer of page i
90
833k
    const int num_pages = ordinal_index->_num_pages;
91
92
833k
    last_block_id = static_cast<size_t>(-1);
93
833k
    current_block_first_rowid = 0;
94
95
1.71M
    for (page_idx = 0; page_idx < num_pages; ++page_idx) {
96
885k
        const auto& page = pages[page_idx];
97
98
885k
        if (_is_forward) {
99
884k
            size_t page_start_block = _offset_to_block_id(page.offset);
100
884k
            size_t page_end_block = _offset_to_block_id(page.offset + page.size - 1);
101
102
            // If page spans two blocks, assign it to the next block (page_end_block)
103
884k
            size_t block_id =
104
884k
                    (page_start_block != page_end_block) ? page_end_block : page_start_block;
105
106
884k
            if (block_id != last_block_id) {
107
833k
                if (last_block_id != static_cast<size_t>(-1)) {
108
510
                    _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
109
510
                }
110
833k
                last_block_id = block_id;
111
833k
                current_block_first_rowid = static_cast<rowid_t>(ordinals[page_idx]);
112
833k
            }
113
884k
        } else {
114
            // Backward: use the last ordinal in each block as first_rowid
115
667
            size_t block_id = _offset_to_block_id(page.offset);
116
721
            if (block_id != last_block_id) {
117
721
                if (last_block_id != static_cast<size_t>(-1)) {
118
0
                    _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
119
0
                }
120
721
                last_block_id = block_id;
121
721
            }
122
667
            current_block_first_rowid = static_cast<rowid_t>(ordinals[page_idx]);
123
667
        }
124
885k
    }
125
126
    // Add the last block
127
834k
    if (last_block_id != static_cast<size_t>(-1)) {
128
834k
        _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
129
834k
    }
130
131
    // Reverse for backward reading
132
833k
    if (!_is_forward && !_block_sequence.empty()) {
133
721
        std::ranges::reverse(_block_sequence);
134
721
    }
135
833k
}
136
137
void SegmentPrefetcher::build_blocks_by_rowids(const roaring::Roaring& row_bitmap,
138
16.6k
                                               const std::vector<SegmentPrefetcher*>& prefetchers) {
139
98.2k
    for (auto* prefetcher : prefetchers) {
140
98.2k
        prefetcher->begin_build_blocks_by_rowids();
141
98.2k
    }
142
143
16.6k
    int batch_size = config::segment_file_cache_consume_rowids_batch_size;
144
16.6k
    std::vector<rowid_t> rowids(batch_size);
145
16.6k
    roaring::api::roaring_uint32_iterator_t iter;
146
16.6k
    roaring::api::roaring_init_iterator(&row_bitmap.roaring, &iter);
147
16.6k
    uint32_t num = roaring::api::roaring_read_uint32_iterator(&iter, rowids.data(), batch_size);
148
149
40.4k
    for (; num > 0;
150
23.7k
         num = roaring::api::roaring_read_uint32_iterator(&iter, rowids.data(), batch_size)) {
151
134k
        for (auto* prefetcher : prefetchers) {
152
134k
            prefetcher->add_rowids(rowids.data(), num);
153
134k
        }
154
23.7k
    }
155
156
98.1k
    for (auto* prefetcher : prefetchers) {
157
98.1k
        prefetcher->finish_build_blocks_by_rowids();
158
98.1k
    }
159
16.6k
}
160
161
98.2k
void SegmentPrefetcher::begin_build_blocks_by_rowids() {
162
98.2k
    reset_blocks();
163
98.2k
    page_idx = 0;
164
98.2k
}
165
166
98.0k
void SegmentPrefetcher::finish_build_blocks_by_rowids() {
167
98.0k
    if (ordinal_index == nullptr) {
168
0
        return;
169
0
    }
170
98.2k
    if (last_block_id != static_cast<size_t>(-1)) {
171
98.2k
        _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
172
98.2k
    }
173
174
98.0k
    if (!_is_forward && !_block_sequence.empty()) {
175
2.76k
        std::ranges::reverse(_block_sequence);
176
2.76k
    }
177
178
18.4E
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
179
18.4E
            "[verbose] SegmentPrefetcher initialized with block count={}, is_forward={}, "
180
18.4E
            "num_pages={}, path={}, blocks: (block_id, first_rowid)=[{}]",
181
18.4E
            _block_sequence.size(), _is_forward, ordinal_index->_num_pages, _path,
182
18.4E
            fmt::join(_block_sequence | std::views::transform([](const auto& b) {
183
0
                          return fmt::format("({}, {})", b.block_id, b.first_rowid);
184
0
                      }),
185
18.4E
                      ","));
186
98.0k
}
187
188
1.86M
void SegmentPrefetcher::reset_blocks() {
189
1.86M
    _block_sequence.clear();
190
1.86M
    _current_block_index = 0;
191
1.86M
    _prefetched_index = -1;
192
1.86M
}
193
194
Status SegmentPrefetcher::init(std::shared_ptr<ColumnReader> column_reader,
195
931k
                               const StorageReadOptions& read_options) {
196
931k
    DCHECK(column_reader != nullptr);
197
198
931k
    reset_blocks();
199
931k
    _is_forward = !read_options.read_orderby_key_reverse;
200
931k
    _path = column_reader->_file_reader->path().filename().native();
201
202
931k
    RETURN_IF_ERROR(column_reader->get_ordinal_index_reader(ordinal_index, read_options.stats));
203
931k
    return Status::OK();
204
931k
}
205
206
1.55M
bool SegmentPrefetcher::need_prefetch(rowid_t current_rowid, std::vector<BlockRange>* out_ranges) {
207
1.55M
    DCHECK(out_ranges != nullptr);
208
18.4E
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log)
209
18.4E
            << fmt::format("[verbose] SegmentPrefetcher need_prefetch enter current_rowid={}, {}",
210
18.4E
                           current_rowid, debug_string());
211
1.55M
    if (_block_sequence.empty() ||
212
1.55M
        _prefetched_index >= static_cast<int>(_block_sequence.size()) - 1) {
213
646k
        return false;
214
646k
    }
215
216
18.4E
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
217
18.4E
            "[verbose] SegmentPrefetcher need_prefetch called with current_rowid={}, {}, "
218
18.4E
            "block=(id={}, first_rowid={})",
219
18.4E
            current_rowid, debug_string(), _block_sequence[_current_block_index].block_id,
220
18.4E
            _block_sequence[_current_block_index].first_rowid);
221
904k
    if (_is_forward) {
222
901k
        while (_current_block_index + 1 < _block_sequence.size() &&
223
901k
               _block_sequence[_current_block_index + 1].first_rowid <= current_rowid) {
224
80
            _current_block_index++;
225
80
        }
226
901k
    } else {
227
2.86k
        while (_current_block_index + 1 < _block_sequence.size() &&
228
2.86k
               _block_sequence[_current_block_index + 1].first_rowid >= current_rowid) {
229
0
            _current_block_index++;
230
0
        }
231
2.86k
    }
232
233
904k
    out_ranges->clear();
234
    // for non-predicate column, some rowids in row_bitmap may be filtered out after vec evaluation of predicate columns,
235
    // so we should not prefetch for these rows
236
904k
    _prefetched_index = std::max(_prefetched_index, _current_block_index - 1);
237
1.80M
    while (_prefetched_index + 1 < _block_sequence.size() &&
238
1.80M
           window_size() < _config.prefetch_window_size) {
239
903k
        out_ranges->push_back(_block_id_to_range(_block_sequence[++_prefetched_index].block_id));
240
903k
    }
241
242
18.4E
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
243
18.4E
            "[verbose] SegmentPrefetcher need_prefetch after calc with current_rowid={}, {}, "
244
18.4E
            "block=(id={}, first_rowid={})",
245
18.4E
            current_rowid, debug_string(), _block_sequence[_current_block_index].block_id,
246
18.4E
            _block_sequence[_current_block_index].first_rowid);
247
248
904k
    bool triggered = !out_ranges->empty();
249
904k
    if (triggered) {
250
902k
        LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
251
96
                "[verbose] SegmentPrefetcher prefetch triggered at rowid={}, {}, prefetch {} "
252
96
                "blocks: (offset, size)=[{}]",
253
96
                current_rowid, debug_string(), out_ranges->size(),
254
96
                fmt::join(*out_ranges | std::views::transform([](const auto& b) {
255
0
                    return fmt::format("({}, {})", b.offset, b.size);
256
0
                }),
257
96
                          ","));
258
902k
    }
259
904k
    return triggered;
260
1.55M
}
261
262
} // namespace doris::segment_v2