Coverage Report

Created: 2026-06-28 02:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/binary_prefix_page.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/binary_prefix_page.h"
19
20
#include <stddef.h>
21
#include <stdint.h>
22
23
#include <algorithm>
24
#include <vector>
25
26
#include "common/status.h"
27
#include "util/coding.h"
28
#include "util/faststring.h"
29
#include "util/slice.h"
30
31
namespace doris {
32
namespace segment_v2 {
33
34
134k
Status BinaryPrefixPageBuilder::add(const uint8_t* vals, size_t* add_count) {
35
134k
    DCHECK(!_finished);
36
134k
    if (*add_count == 0) {
37
0
        return Status::OK();
38
0
    }
39
40
134k
    const Slice* src = reinterpret_cast<const Slice*>(vals);
41
42
134k
    int i = 0;
43
269k
    for (; i < *add_count; ++i, ++src) {
44
134k
        if (is_page_full()) {
45
0
            break;
46
0
        }
47
134k
        const char* entry = src->data;
48
134k
        size_t entry_len = src->size;
49
134k
        size_t old_size = _buffer.size();
50
51
134k
        size_t share_len;
52
134k
        if (_count % RESTART_POINT_INTERVAL == 0) {
53
8.67k
            share_len = 0;
54
8.67k
            _restart_points_offset.push_back(cast_set<uint32_t>(old_size));
55
126k
        } else {
56
126k
            size_t max_share_len = std::min(_last_entry.size(), entry_len);
57
126k
            share_len = max_share_len;
58
262k
            for (int j = 0; j < max_share_len; ++j) {
59
262k
                if (entry[j] != _last_entry[j]) {
60
126k
                    share_len = j;
61
126k
                    break;
62
126k
                }
63
262k
            }
64
126k
        }
65
134k
        size_t non_share_len = entry_len - share_len;
66
        // This may need a large memory, should return error if could not allocated
67
        // successfully, to avoid BE OOM.
68
134k
        RETURN_IF_CATCH_EXCEPTION({
69
134k
            put_varint32(&_buffer, cast_set<uint32_t>(share_len));
70
134k
            put_varint32(&_buffer, cast_set<uint32_t>(non_share_len));
71
134k
            _buffer.append(entry + share_len, non_share_len);
72
73
134k
            _last_entry.clear();
74
134k
            _last_entry.append(entry, entry_len);
75
134k
        });
76
77
134k
        _raw_data_size += entry_len;
78
134k
        ++_count;
79
134k
    }
80
134k
    *add_count = i;
81
134k
    return Status::OK();
82
134k
}
83
84
328
Status BinaryPrefixPageBuilder::finish(OwnedSlice* slice) {
85
328
    DCHECK(!_finished);
86
328
    _finished = true;
87
328
    RETURN_IF_CATCH_EXCEPTION({
88
328
        put_fixed32_le(&_buffer, (uint32_t)_count);
89
328
        uint8_t restart_point_internal = RESTART_POINT_INTERVAL;
90
328
        _buffer.append(&restart_point_internal, 1);
91
328
        auto restart_point_size = _restart_points_offset.size();
92
328
        for (uint32_t i = 0; i < restart_point_size; ++i) {
93
328
            put_fixed32_le(&_buffer, _restart_points_offset[i]);
94
328
        }
95
328
        put_fixed32_le(&_buffer, cast_set<uint32_t>(restart_point_size));
96
328
        *slice = _buffer.build();
97
328
    });
98
328
    return Status::OK();
99
328
}
100
101
const uint8_t* BinaryPrefixPageDecoder::_decode_value_lengths(const uint8_t* ptr, uint32_t* shared,
102
227k
                                                              uint32_t* non_shared) {
103
227k
    if ((ptr = decode_varint32_ptr(ptr, _footer_start, shared)) == nullptr) {
104
0
        return nullptr;
105
0
    }
106
227k
    if ((ptr = decode_varint32_ptr(ptr, _footer_start, non_shared)) == nullptr) {
107
0
        return nullptr;
108
0
    }
109
227k
    if (_footer_start - ptr < *non_shared) {
110
0
        return nullptr;
111
0
    }
112
227k
    return ptr;
113
227k
}
114
115
228k
Status BinaryPrefixPageDecoder::_read_next_value() {
116
228k
    if (_cur_pos >= _num_values) {
117
508
        return Status::EndOfFile("no more value to read");
118
508
    }
119
227k
    uint32_t shared_len;
120
227k
    uint32_t non_shared_len;
121
227k
    auto data_ptr = _decode_value_lengths(_next_ptr, &shared_len, &non_shared_len);
122
227k
    if (data_ptr == nullptr) {
123
0
        DCHECK(false) << "[BinaryPrefixPageDecoder::_read_next_value] corruption!";
124
0
        return Status::Corruption("Failed to decode value at position {}", _cur_pos);
125
0
    }
126
227k
    _current_value.resize(shared_len);
127
227k
    _current_value.append(data_ptr, non_shared_len);
128
227k
    _next_ptr = data_ptr + non_shared_len;
129
227k
    return Status::OK();
130
227k
}
131
132
48.1k
Status BinaryPrefixPageDecoder::_seek_to_restart_point(size_t restart_point_index) {
133
48.1k
    _cur_pos = cast_set<uint32_t>(restart_point_index * _restart_point_internal);
134
48.1k
    _next_ptr = _get_restart_point(restart_point_index);
135
48.1k
    return _read_next_value();
136
48.1k
}
137
138
1.36k
Status BinaryPrefixPageDecoder::init() {
139
1.36k
    _cur_pos = 0;
140
1.36k
    _next_ptr = reinterpret_cast<const uint8_t*>(_data.get_data());
141
142
1.36k
    const uint8_t* end = _next_ptr + _data.get_size();
143
1.36k
    _num_restarts = decode_fixed32_le(end - 4);
144
1.36k
    _restarts_ptr = end - (_num_restarts + 1) * 4;
145
1.36k
    _footer_start = _restarts_ptr - 4 - 1;
146
1.36k
    _num_values = decode_fixed32_le(_footer_start);
147
1.36k
    _restart_point_internal = decode_fixed8(_footer_start + 4);
148
1.36k
    _parsed = true;
149
1.36k
    return _read_next_value();
150
1.36k
}
151
152
455
Status BinaryPrefixPageDecoder::seek_to_position_in_page(size_t pos) {
153
455
    DCHECK(_parsed);
154
455
    DCHECK_LE(pos, _num_values);
155
455
    if (_num_values == 0) [[unlikely]] {
156
0
        if (pos != 0) {
157
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
158
0
                    "seek pos {} is larger than total elements  {}", pos, _num_values);
159
0
        }
160
0
    }
161
    // seek past the last value is valid
162
455
    if (pos == _num_values) {
163
0
        _cur_pos = cast_set<uint32_t>(_num_values);
164
0
        return Status::OK();
165
0
    }
166
167
455
    size_t restart_point_index = pos / _restart_point_internal;
168
455
    RETURN_IF_ERROR(_seek_to_restart_point(restart_point_index));
169
884
    while (_cur_pos < pos) {
170
429
        _cur_pos++;
171
429
        RETURN_IF_ERROR(_read_next_value());
172
429
    }
173
455
    return Status::OK();
174
455
}
175
176
6.17k
Status BinaryPrefixPageDecoder::seek_at_or_after_value(const void* value, bool* exact_match) {
177
6.17k
    DCHECK(_parsed);
178
6.17k
    Slice target = *reinterpret_cast<const Slice*>(value);
179
180
6.17k
    uint32_t left = 0;
181
6.17k
    uint32_t right = _num_restarts;
182
    // find the first restart point >= target. after loop,
183
    // - left == index of first restart point >= target when found
184
    // - left == _num_restarts when not found (all restart points < target)
185
47.6k
    while (left < right) {
186
41.4k
        uint32_t mid = left + (right - left) / 2;
187
        // read first entry at restart point `mid`
188
41.4k
        RETURN_IF_ERROR(_seek_to_restart_point(mid));
189
41.4k
        Slice mid_entry(_current_value);
190
41.4k
        if (mid_entry.compare(target) < 0) {
191
20.2k
            left = mid + 1;
192
21.2k
        } else {
193
21.2k
            right = mid;
194
21.2k
        }
195
41.4k
    }
196
197
    // then linear search from the last restart pointer < target.
198
    // when left == 0, all restart points >= target, so search from first one.
199
    // otherwise search from the last restart point < target, which is left - 1
200
6.17k
    uint32_t search_index = left > 0 ? left - 1 : 0;
201
6.17k
    RETURN_IF_ERROR(_seek_to_restart_point(search_index));
202
50.5k
    while (true) {
203
50.5k
        int cmp = Slice(_current_value).compare(target);
204
50.5k
        if (cmp >= 0) {
205
5.66k
            *exact_match = cmp == 0;
206
5.66k
            return Status::OK();
207
5.66k
        }
208
44.9k
        _cur_pos++;
209
44.9k
        auto st = _read_next_value();
210
44.9k
        if (st.is<ErrorCode::END_OF_FILE>()) {
211
508
            return Status::Error<ErrorCode::ENTRY_NOT_FOUND, false>(
212
508
                    "all value small than the value");
213
508
        }
214
44.4k
        if (!st.ok()) {
215
0
            return st;
216
0
        }
217
44.4k
    }
218
6.17k
}
219
220
920
Status BinaryPrefixPageDecoder::next_batch(size_t* n, MutableColumnPtr& dst) {
221
920
    DCHECK(_parsed);
222
920
    if (*n == 0 || _cur_pos >= _num_values) [[unlikely]] {
223
0
        *n = 0;
224
0
        return Status::OK();
225
0
    }
226
920
    size_t max_fetch = std::min(*n, static_cast<size_t>(_num_values - _cur_pos));
227
228
    // read and copy values
229
135k
    for (size_t i = 0; i < max_fetch; ++i) {
230
134k
        dst->insert_data((char*)(_current_value.data()), _current_value.size());
231
134k
        _cur_pos++;
232
        // reach the end of the page, should not read the next value
233
134k
        if (_cur_pos < _num_values) {
234
133k
            RETURN_IF_ERROR(_read_next_value());
235
133k
        }
236
134k
    }
237
238
920
    *n = max_fetch;
239
920
    return Status::OK();
240
920
}
241
242
} // namespace segment_v2
243
} // namespace doris