Coverage Report

Created: 2026-04-14 13:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/binary_prefix_page.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/binary_prefix_page.h"
19
20
#include <stddef.h>
21
#include <stdint.h>
22
23
#include <algorithm>
24
#include <vector>
25
26
#include "common/status.h"
27
#include "util/coding.h"
28
#include "util/faststring.h"
29
#include "util/slice.h"
30
31
namespace doris {
32
namespace segment_v2 {
33
34
134k
Status BinaryPrefixPageBuilder::add(const uint8_t* vals, size_t* add_count) {
35
134k
    DCHECK(!_finished);
36
134k
    if (*add_count == 0) {
37
0
        return Status::OK();
38
0
    }
39
40
134k
    const Slice* src = reinterpret_cast<const Slice*>(vals);
41
134k
    if (_count == 0) {
42
243
        _first_entry.assign_copy(reinterpret_cast<const uint8_t*>(src->get_data()),
43
243
                                 src->get_size());
44
243
    }
45
46
134k
    int i = 0;
47
269k
    for (; i < *add_count; ++i, ++src) {
48
134k
        if (is_page_full()) {
49
0
            break;
50
0
        }
51
134k
        const char* entry = src->data;
52
134k
        size_t entry_len = src->size;
53
134k
        size_t old_size = _buffer.size();
54
55
134k
        size_t share_len;
56
134k
        if (_count % RESTART_POINT_INTERVAL == 0) {
57
8.58k
            share_len = 0;
58
8.58k
            _restart_points_offset.push_back(cast_set<uint32_t>(old_size));
59
126k
        } else {
60
126k
            size_t max_share_len = std::min(_last_entry.size(), entry_len);
61
126k
            share_len = max_share_len;
62
262k
            for (int j = 0; j < max_share_len; ++j) {
63
262k
                if (entry[j] != _last_entry[j]) {
64
126k
                    share_len = j;
65
126k
                    break;
66
126k
                }
67
262k
            }
68
126k
        }
69
134k
        size_t non_share_len = entry_len - share_len;
70
        // This may need a large memory, should return error if could not allocated
71
        // successfully, to avoid BE OOM.
72
134k
        RETURN_IF_CATCH_EXCEPTION({
73
134k
            put_varint32(&_buffer, cast_set<uint32_t>(share_len));
74
134k
            put_varint32(&_buffer, cast_set<uint32_t>(non_share_len));
75
134k
            _buffer.append(entry + share_len, non_share_len);
76
77
134k
            _last_entry.clear();
78
134k
            _last_entry.append(entry, entry_len);
79
134k
        });
80
81
134k
        _raw_data_size += entry_len;
82
134k
        ++_count;
83
134k
    }
84
134k
    *add_count = i;
85
134k
    return Status::OK();
86
134k
}
87
88
243
Status BinaryPrefixPageBuilder::finish(OwnedSlice* slice) {
89
243
    DCHECK(!_finished);
90
243
    _finished = true;
91
243
    RETURN_IF_CATCH_EXCEPTION({
92
243
        put_fixed32_le(&_buffer, (uint32_t)_count);
93
243
        uint8_t restart_point_internal = RESTART_POINT_INTERVAL;
94
243
        _buffer.append(&restart_point_internal, 1);
95
243
        auto restart_point_size = _restart_points_offset.size();
96
243
        for (uint32_t i = 0; i < restart_point_size; ++i) {
97
243
            put_fixed32_le(&_buffer, _restart_points_offset[i]);
98
243
        }
99
243
        put_fixed32_le(&_buffer, cast_set<uint32_t>(restart_point_size));
100
243
        *slice = _buffer.build();
101
243
    });
102
243
    return Status::OK();
103
243
}
104
105
const uint8_t* BinaryPrefixPageDecoder::_decode_value_lengths(const uint8_t* ptr, uint32_t* shared,
106
224k
                                                              uint32_t* non_shared) {
107
224k
    if ((ptr = decode_varint32_ptr(ptr, _footer_start, shared)) == nullptr) {
108
0
        return nullptr;
109
0
    }
110
224k
    if ((ptr = decode_varint32_ptr(ptr, _footer_start, non_shared)) == nullptr) {
111
0
        return nullptr;
112
0
    }
113
224k
    if (_footer_start - ptr < *non_shared) {
114
0
        return nullptr;
115
0
    }
116
224k
    return ptr;
117
224k
}
118
119
224k
Status BinaryPrefixPageDecoder::_read_next_value() {
120
224k
    if (_cur_pos >= _num_values) {
121
9
        return Status::EndOfFile("no more value to read");
122
9
    }
123
224k
    uint32_t shared_len;
124
224k
    uint32_t non_shared_len;
125
224k
    auto data_ptr = _decode_value_lengths(_next_ptr, &shared_len, &non_shared_len);
126
224k
    if (data_ptr == nullptr) {
127
0
        DCHECK(false) << "[BinaryPrefixPageDecoder::_read_next_value] corruption!";
128
0
        return Status::Corruption("Failed to decode value at position {}", _cur_pos);
129
0
    }
130
224k
    _current_value.resize(shared_len);
131
224k
    _current_value.append(data_ptr, non_shared_len);
132
224k
    _next_ptr = data_ptr + non_shared_len;
133
224k
    return Status::OK();
134
224k
}
135
136
46.0k
Status BinaryPrefixPageDecoder::_seek_to_restart_point(size_t restart_point_index) {
137
46.0k
    _cur_pos = cast_set<uint32_t>(restart_point_index * _restart_point_internal);
138
46.0k
    _next_ptr = _get_restart_point(restart_point_index);
139
46.0k
    return _read_next_value();
140
46.0k
}
141
142
203
Status BinaryPrefixPageDecoder::init() {
143
203
    _cur_pos = 0;
144
203
    _next_ptr = reinterpret_cast<const uint8_t*>(_data.get_data());
145
146
203
    const uint8_t* end = _next_ptr + _data.get_size();
147
203
    _num_restarts = decode_fixed32_le(end - 4);
148
203
    _restarts_ptr = end - (_num_restarts + 1) * 4;
149
203
    _footer_start = _restarts_ptr - 4 - 1;
150
203
    _num_values = decode_fixed32_le(_footer_start);
151
203
    _restart_point_internal = decode_fixed8(_footer_start + 4);
152
203
    _parsed = true;
153
203
    return _read_next_value();
154
203
}
155
156
227
Status BinaryPrefixPageDecoder::seek_to_position_in_page(size_t pos) {
157
227
    DCHECK(_parsed);
158
227
    DCHECK_LE(pos, _num_values);
159
227
    if (_num_values == 0) [[unlikely]] {
160
0
        if (pos != 0) {
161
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
162
0
                    "seek pos {} is larger than total elements  {}", pos, _num_values);
163
0
        }
164
0
    }
165
    // seek past the last value is valid
166
227
    if (pos == _num_values) {
167
0
        _cur_pos = cast_set<uint32_t>(_num_values);
168
0
        return Status::OK();
169
0
    }
170
171
227
    size_t restart_point_index = pos / _restart_point_internal;
172
227
    RETURN_IF_ERROR(_seek_to_restart_point(restart_point_index));
173
656
    while (_cur_pos < pos) {
174
429
        _cur_pos++;
175
429
        RETURN_IF_ERROR(_read_next_value());
176
429
    }
177
227
    return Status::OK();
178
227
}
179
180
5.23k
Status BinaryPrefixPageDecoder::seek_at_or_after_value(const void* value, bool* exact_match) {
181
5.23k
    DCHECK(_parsed);
182
5.23k
    Slice target = *reinterpret_cast<const Slice*>(value);
183
184
5.23k
    uint32_t left = 0;
185
5.23k
    uint32_t right = _num_restarts;
186
    // find the first restart point >= target. after loop,
187
    // - left == index of first restart point >= target when found
188
    // - left == _num_restarts when not found (all restart points < target)
189
45.7k
    while (left < right) {
190
40.5k
        uint32_t mid = left + (right - left) / 2;
191
        // read first entry at restart point `mid`
192
40.5k
        RETURN_IF_ERROR(_seek_to_restart_point(mid));
193
40.5k
        Slice mid_entry(_current_value);
194
40.5k
        if (mid_entry.compare(target) < 0) {
195
19.4k
            left = mid + 1;
196
21.1k
        } else {
197
21.1k
            right = mid;
198
21.1k
        }
199
40.5k
    }
200
201
    // then linear search from the last restart pointer < target.
202
    // when left == 0, all restart points >= target, so search from first one.
203
    // otherwise search from the last restart point < target, which is left - 1
204
5.23k
    uint32_t search_index = left > 0 ? left - 1 : 0;
205
5.23k
    RETURN_IF_ERROR(_seek_to_restart_point(search_index));
206
49.1k
    while (true) {
207
49.1k
        int cmp = Slice(_current_value).compare(target);
208
49.1k
        if (cmp >= 0) {
209
5.22k
            *exact_match = cmp == 0;
210
5.22k
            return Status::OK();
211
5.22k
        }
212
43.9k
        _cur_pos++;
213
43.9k
        auto st = _read_next_value();
214
43.9k
        if (st.is<ErrorCode::END_OF_FILE>()) {
215
9
            return Status::Error<ErrorCode::ENTRY_NOT_FOUND, false>(
216
9
                    "all value small than the value");
217
9
        }
218
43.8k
        if (!st.ok()) {
219
0
            return st;
220
0
        }
221
43.8k
    }
222
5.23k
}
223
224
258
Status BinaryPrefixPageDecoder::next_batch(size_t* n, MutableColumnPtr& dst) {
225
258
    DCHECK(_parsed);
226
258
    if (*n == 0 || _cur_pos >= _num_values) [[unlikely]] {
227
0
        *n = 0;
228
0
        return Status::OK();
229
0
    }
230
258
    size_t max_fetch = std::min(*n, static_cast<size_t>(_num_values - _cur_pos));
231
232
    // read and copy values
233
133k
    for (size_t i = 0; i < max_fetch; ++i) {
234
133k
        dst->insert_data((char*)(_current_value.data()), _current_value.size());
235
133k
        _cur_pos++;
236
        // reach the end of the page, should not read the next value
237
133k
        if (_cur_pos < _num_values) {
238
133k
            RETURN_IF_ERROR(_read_next_value());
239
133k
        }
240
133k
    }
241
242
258
    *n = max_fetch;
243
258
    return Status::OK();
244
258
}
245
246
} // namespace segment_v2
247
} // namespace doris