Coverage Report

Created: 2026-03-17 12:19

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/segment_prefetcher.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/segment_prefetcher.h"
19
20
#include <algorithm>
21
#include <ranges>
22
23
#include "common/config.h"
24
#include "common/logging.h"
25
#include "storage/index/ordinal_page_index.h"
26
#include "storage/iterators.h"
27
#include "storage/segment/column_reader.h"
28
29
namespace doris::segment_v2 {
30
31
109k
void SegmentPrefetcher::add_rowids(const rowid_t* rowids, uint32_t num) {
32
109k
    if (ordinal_index == nullptr) {
33
0
        return;
34
0
    }
35
109k
    const auto& ordinals = ordinal_index->_ordinals; // ordinals[i] = first ordinal of page i
36
109k
    const auto& pages = ordinal_index->_pages;       // pages[i] = page pointer of page i
37
109k
    const int num_pages = ordinal_index->_num_pages;
38
198M
    for (uint32_t i = 0; i < num; ++i) {
39
198M
        rowid_t rowid = rowids[i];
40
41
198M
        if (_is_forward) {
42
198M
            while (page_idx < num_pages - 1 && ordinals[page_idx + 1] <= rowid) {
43
170k
                page_idx++;
44
170k
            }
45
46
198M
            const auto& page = pages[page_idx];
47
198M
            size_t page_start_block = _offset_to_block_id(page.offset);
48
198M
            size_t page_end_block = _offset_to_block_id(page.offset + page.size - 1);
49
50
            // If page spans two blocks, assign it to the next block (page_end_block)
51
198M
            size_t block_id =
52
198M
                    (page_start_block != page_end_block) ? page_end_block : page_start_block;
53
54
198M
            if (block_id != last_block_id) {
55
76.6k
                if (last_block_id != static_cast<size_t>(-1)) {
56
427
                    _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
57
427
                }
58
76.6k
                last_block_id = block_id;
59
76.6k
                current_block_first_rowid = rowid;
60
76.6k
            }
61
198M
        } else {
62
            // Backward reading: we need the last rowid in each block as the "first" rowid
63
            // (because when reading backwards, we encounter the largest rowid first)
64
            //
65
            // Strategy: iterate forward through bitmap, but for each block,
66
            // keep updating current_block_first_rowid to the latest (largest) rowid in that block
67
338k
            while (page_idx < num_pages - 1 && ordinals[page_idx + 1] <= rowid) {
68
246
                page_idx++;
69
246
            }
70
338k
            size_t block_id = _offset_to_block_id(pages[page_idx].offset);
71
72
338k
            if (block_id != last_block_id) {
73
326
                if (last_block_id != static_cast<size_t>(-1)) {
74
0
                    _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
75
0
                }
76
326
                last_block_id = block_id;
77
326
            }
78
338k
            current_block_first_rowid = rowid;
79
338k
        }
80
198M
    }
81
109k
}
82
83
1.17M
void SegmentPrefetcher::build_all_data_blocks() {
84
1.17M
    if (ordinal_index == nullptr) {
85
0
        return;
86
0
    }
87
1.17M
    reset_blocks();
88
1.17M
    const auto& ordinals = ordinal_index->_ordinals; // ordinals[i] = first ordinal of page i
89
1.17M
    const auto& pages = ordinal_index->_pages;       // pages[i] = page pointer of page i
90
1.17M
    const int num_pages = ordinal_index->_num_pages;
91
92
1.17M
    last_block_id = static_cast<size_t>(-1);
93
1.17M
    current_block_first_rowid = 0;
94
95
2.39M
    for (page_idx = 0; page_idx < num_pages; ++page_idx) {
96
1.22M
        const auto& page = pages[page_idx];
97
98
1.22M
        if (_is_forward) {
99
1.22M
            size_t page_start_block = _offset_to_block_id(page.offset);
100
1.22M
            size_t page_end_block = _offset_to_block_id(page.offset + page.size - 1);
101
102
            // If page spans two blocks, assign it to the next block (page_end_block)
103
1.22M
            size_t block_id =
104
1.22M
                    (page_start_block != page_end_block) ? page_end_block : page_start_block;
105
106
1.22M
            if (block_id != last_block_id) {
107
1.17M
                if (last_block_id != static_cast<size_t>(-1)) {
108
492
                    _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
109
492
                }
110
1.17M
                last_block_id = block_id;
111
1.17M
                current_block_first_rowid = static_cast<rowid_t>(ordinals[page_idx]);
112
1.17M
            }
113
1.22M
        } else {
114
            // Backward: use the last ordinal in each block as first_rowid
115
291
            size_t block_id = _offset_to_block_id(page.offset);
116
624
            if (block_id != last_block_id) {
117
624
                if (last_block_id != static_cast<size_t>(-1)) {
118
0
                    _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
119
0
                }
120
624
                last_block_id = block_id;
121
624
            }
122
291
            current_block_first_rowid = static_cast<rowid_t>(ordinals[page_idx]);
123
291
        }
124
1.22M
    }
125
126
    // Add the last block
127
1.17M
    if (last_block_id != static_cast<size_t>(-1)) {
128
1.17M
        _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
129
1.17M
    }
130
131
    // Reverse for backward reading
132
1.17M
    if (!_is_forward && !_block_sequence.empty()) {
133
624
        std::ranges::reverse(_block_sequence);
134
624
    }
135
1.17M
}
136
137
void SegmentPrefetcher::build_blocks_by_rowids(const roaring::Roaring& row_bitmap,
138
18.1k
                                               const std::vector<SegmentPrefetcher*>& prefetchers) {
139
76.4k
    for (auto* prefetcher : prefetchers) {
140
76.4k
        prefetcher->begin_build_blocks_by_rowids();
141
76.4k
    }
142
143
18.1k
    int batch_size = config::segment_file_cache_consume_rowids_batch_size;
144
18.1k
    std::vector<rowid_t> rowids(batch_size);
145
18.1k
    roaring::api::roaring_uint32_iterator_t iter;
146
18.1k
    roaring::api::roaring_init_iterator(&row_bitmap.roaring, &iter);
147
18.1k
    uint32_t num = roaring::api::roaring_read_uint32_iterator(&iter, rowids.data(), batch_size);
148
149
44.3k
    for (; num > 0;
150
26.2k
         num = roaring::api::roaring_read_uint32_iterator(&iter, rowids.data(), batch_size)) {
151
109k
        for (auto* prefetcher : prefetchers) {
152
109k
            prefetcher->add_rowids(rowids.data(), num);
153
109k
        }
154
26.2k
    }
155
156
76.5k
    for (auto* prefetcher : prefetchers) {
157
76.5k
        prefetcher->finish_build_blocks_by_rowids();
158
76.5k
    }
159
18.1k
}
160
161
76.4k
void SegmentPrefetcher::begin_build_blocks_by_rowids() {
162
76.4k
    reset_blocks();
163
76.4k
    page_idx = 0;
164
76.4k
}
165
166
76.5k
void SegmentPrefetcher::finish_build_blocks_by_rowids() {
167
76.5k
    if (ordinal_index == nullptr) {
168
0
        return;
169
0
    }
170
76.5k
    if (last_block_id != static_cast<size_t>(-1)) {
171
76.5k
        _block_sequence.emplace_back(last_block_id, current_block_first_rowid);
172
76.5k
    }
173
174
76.5k
    if (!_is_forward && !_block_sequence.empty()) {
175
326
        std::ranges::reverse(_block_sequence);
176
326
    }
177
178
18.4E
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
179
18.4E
            "[verbose] SegmentPrefetcher initialized with block count={}, is_forward={}, "
180
18.4E
            "num_pages={}, path={}, blocks: (block_id, first_rowid)=[{}]",
181
18.4E
            _block_sequence.size(), _is_forward, ordinal_index->_num_pages, _path,
182
18.4E
            fmt::join(_block_sequence | std::views::transform([](const auto& b) {
183
0
                          return fmt::format("({}, {})", b.block_id, b.first_rowid);
184
0
                      }),
185
18.4E
                      ","));
186
76.5k
}
187
188
2.49M
void SegmentPrefetcher::reset_blocks() {
189
2.49M
    _block_sequence.clear();
190
2.49M
    _current_block_index = 0;
191
2.49M
    _prefetched_index = -1;
192
2.49M
}
193
194
Status SegmentPrefetcher::init(std::shared_ptr<ColumnReader> column_reader,
195
1.24M
                               const StorageReadOptions& read_options) {
196
1.24M
    DCHECK(column_reader != nullptr);
197
198
1.24M
    reset_blocks();
199
1.24M
    _is_forward = !read_options.read_orderby_key_reverse;
200
1.24M
    _path = column_reader->_file_reader->path().filename().native();
201
202
1.24M
    RETURN_IF_ERROR(column_reader->get_ordinal_index_reader(ordinal_index, read_options.stats));
203
1.24M
    return Status::OK();
204
1.24M
}
205
206
2.33M
bool SegmentPrefetcher::need_prefetch(rowid_t current_rowid, std::vector<BlockRange>* out_ranges) {
207
2.33M
    DCHECK(out_ranges != nullptr);
208
18.4E
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log)
209
18.4E
            << fmt::format("[verbose] SegmentPrefetcher need_prefetch enter current_rowid={}, {}",
210
18.4E
                           current_rowid, debug_string());
211
2.33M
    if (_block_sequence.empty() ||
212
2.33M
        _prefetched_index >= static_cast<int>(_block_sequence.size()) - 1) {
213
1.11M
        return false;
214
1.11M
    }
215
216
18.4E
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
217
18.4E
            "[verbose] SegmentPrefetcher need_prefetch called with current_rowid={}, {}, "
218
18.4E
            "block=(id={}, first_rowid={})",
219
18.4E
            current_rowid, debug_string(), _block_sequence[_current_block_index].block_id,
220
18.4E
            _block_sequence[_current_block_index].first_rowid);
221
1.21M
    if (_is_forward) {
222
1.21M
        while (_current_block_index + 1 < _block_sequence.size() &&
223
1.21M
               _block_sequence[_current_block_index + 1].first_rowid <= current_rowid) {
224
61
            _current_block_index++;
225
61
        }
226
18.4E
    } else {
227
18.4E
        while (_current_block_index + 1 < _block_sequence.size() &&
228
18.4E
               _block_sequence[_current_block_index + 1].first_rowid >= current_rowid) {
229
0
            _current_block_index++;
230
0
        }
231
18.4E
    }
232
233
1.21M
    out_ranges->clear();
234
    // for non-predicate column, some rowids in row_bitmap may be filtered out after vec evaluation of predicate columns,
235
    // so we should not prefetch for these rows
236
1.21M
    _prefetched_index = std::max(_prefetched_index, _current_block_index - 1);
237
2.43M
    while (_prefetched_index + 1 < _block_sequence.size() &&
238
2.43M
           window_size() < _config.prefetch_window_size) {
239
1.21M
        out_ranges->push_back(_block_id_to_range(_block_sequence[++_prefetched_index].block_id));
240
1.21M
    }
241
242
18.4E
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
243
18.4E
            "[verbose] SegmentPrefetcher need_prefetch after calc with current_rowid={}, {}, "
244
18.4E
            "block=(id={}, first_rowid={})",
245
18.4E
            current_rowid, debug_string(), _block_sequence[_current_block_index].block_id,
246
18.4E
            _block_sequence[_current_block_index].first_rowid);
247
248
1.21M
    bool triggered = !out_ranges->empty();
249
1.21M
    if (triggered) {
250
1.21M
        LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
251
137
                "[verbose] SegmentPrefetcher prefetch triggered at rowid={}, {}, prefetch {} "
252
137
                "blocks: (offset, size)=[{}]",
253
137
                current_rowid, debug_string(), out_ranges->size(),
254
137
                fmt::join(*out_ranges | std::views::transform([](const auto& b) {
255
0
                    return fmt::format("({}, {})", b.offset, b.size);
256
0
                }),
257
137
                          ","));
258
1.21M
    }
259
1.21M
    return triggered;
260
2.33M
}
261
262
} // namespace doris::segment_v2