Coverage Report

Created: 2026-03-14 20:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/binary_prefix_page.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/binary_prefix_page.h"
19
20
#include <stddef.h>
21
#include <stdint.h>
22
23
#include <algorithm>
24
#include <vector>
25
26
#include "common/status.h"
27
#include "util/coding.h"
28
#include "util/faststring.h"
29
#include "util/slice.h"
30
31
namespace doris {
32
namespace segment_v2 {
33
#include "common/compile_check_begin.h"
34
35
134k
Status BinaryPrefixPageBuilder::add(const uint8_t* vals, size_t* add_count) {
36
134k
    DCHECK(!_finished);
37
134k
    if (*add_count == 0) {
38
0
        return Status::OK();
39
0
    }
40
41
134k
    const Slice* src = reinterpret_cast<const Slice*>(vals);
42
134k
    if (_count == 0) {
43
239
        _first_entry.assign_copy(reinterpret_cast<const uint8_t*>(src->get_data()),
44
239
                                 src->get_size());
45
239
    }
46
47
134k
    int i = 0;
48
269k
    for (; i < *add_count; ++i, ++src) {
49
134k
        if (is_page_full()) {
50
0
            break;
51
0
        }
52
134k
        const char* entry = src->data;
53
134k
        size_t entry_len = src->size;
54
134k
        size_t old_size = _buffer.size();
55
56
134k
        size_t share_len;
57
134k
        if (_count % RESTART_POINT_INTERVAL == 0) {
58
8.58k
            share_len = 0;
59
8.58k
            _restart_points_offset.push_back(cast_set<uint32_t>(old_size));
60
126k
        } else {
61
126k
            size_t max_share_len = std::min(_last_entry.size(), entry_len);
62
126k
            share_len = max_share_len;
63
262k
            for (int j = 0; j < max_share_len; ++j) {
64
262k
                if (entry[j] != _last_entry[j]) {
65
126k
                    share_len = j;
66
126k
                    break;
67
126k
                }
68
262k
            }
69
126k
        }
70
134k
        size_t non_share_len = entry_len - share_len;
71
        // This may need a large memory, should return error if could not allocated
72
        // successfully, to avoid BE OOM.
73
134k
        RETURN_IF_CATCH_EXCEPTION({
74
134k
            put_varint32(&_buffer, cast_set<uint32_t>(share_len));
75
134k
            put_varint32(&_buffer, cast_set<uint32_t>(non_share_len));
76
134k
            _buffer.append(entry + share_len, non_share_len);
77
78
134k
            _last_entry.clear();
79
134k
            _last_entry.append(entry, entry_len);
80
134k
        });
81
82
134k
        _raw_data_size += entry_len;
83
134k
        ++_count;
84
134k
    }
85
134k
    *add_count = i;
86
134k
    return Status::OK();
87
134k
}
88
89
239
Status BinaryPrefixPageBuilder::finish(OwnedSlice* slice) {
90
239
    DCHECK(!_finished);
91
239
    _finished = true;
92
239
    RETURN_IF_CATCH_EXCEPTION({
93
239
        put_fixed32_le(&_buffer, (uint32_t)_count);
94
239
        uint8_t restart_point_internal = RESTART_POINT_INTERVAL;
95
239
        _buffer.append(&restart_point_internal, 1);
96
239
        auto restart_point_size = _restart_points_offset.size();
97
239
        for (uint32_t i = 0; i < restart_point_size; ++i) {
98
239
            put_fixed32_le(&_buffer, _restart_points_offset[i]);
99
239
        }
100
239
        put_fixed32_le(&_buffer, cast_set<uint32_t>(restart_point_size));
101
239
        *slice = _buffer.build();
102
239
    });
103
239
    return Status::OK();
104
239
}
105
106
const uint8_t* BinaryPrefixPageDecoder::_decode_value_lengths(const uint8_t* ptr, uint32_t* shared,
107
224k
                                                              uint32_t* non_shared) {
108
224k
    if ((ptr = decode_varint32_ptr(ptr, _footer_start, shared)) == nullptr) {
109
0
        return nullptr;
110
0
    }
111
224k
    if ((ptr = decode_varint32_ptr(ptr, _footer_start, non_shared)) == nullptr) {
112
0
        return nullptr;
113
0
    }
114
224k
    if (_footer_start - ptr < *non_shared) {
115
0
        return nullptr;
116
0
    }
117
224k
    return ptr;
118
224k
}
119
120
224k
Status BinaryPrefixPageDecoder::_read_next_value() {
121
224k
    if (_cur_pos >= _num_values) {
122
9
        return Status::EndOfFile("no more value to read");
123
9
    }
124
224k
    uint32_t shared_len;
125
224k
    uint32_t non_shared_len;
126
224k
    auto data_ptr = _decode_value_lengths(_next_ptr, &shared_len, &non_shared_len);
127
224k
    if (data_ptr == nullptr) {
128
0
        DCHECK(false) << "[BinaryPrefixPageDecoder::_read_next_value] corruption!";
129
0
        return Status::Corruption("Failed to decode value at position {}", _cur_pos);
130
0
    }
131
224k
    _current_value.resize(shared_len);
132
224k
    _current_value.append(data_ptr, non_shared_len);
133
224k
    _next_ptr = data_ptr + non_shared_len;
134
224k
    return Status::OK();
135
224k
}
136
137
46.0k
Status BinaryPrefixPageDecoder::_seek_to_restart_point(size_t restart_point_index) {
138
46.0k
    _cur_pos = cast_set<uint32_t>(restart_point_index * _restart_point_internal);
139
46.0k
    _next_ptr = _get_restart_point(restart_point_index);
140
46.0k
    return _read_next_value();
141
46.0k
}
142
143
199
Status BinaryPrefixPageDecoder::init() {
144
199
    _cur_pos = 0;
145
199
    _next_ptr = reinterpret_cast<const uint8_t*>(_data.get_data());
146
147
199
    const uint8_t* end = _next_ptr + _data.get_size();
148
199
    _num_restarts = decode_fixed32_le(end - 4);
149
199
    _restarts_ptr = end - (_num_restarts + 1) * 4;
150
199
    _footer_start = _restarts_ptr - 4 - 1;
151
199
    _num_values = decode_fixed32_le(_footer_start);
152
199
    _restart_point_internal = decode_fixed8(_footer_start + 4);
153
199
    _parsed = true;
154
199
    return _read_next_value();
155
199
}
156
157
223
Status BinaryPrefixPageDecoder::seek_to_position_in_page(size_t pos) {
158
223
    DCHECK(_parsed);
159
223
    DCHECK_LE(pos, _num_values);
160
223
    if (_num_values == 0) [[unlikely]] {
161
0
        if (pos != 0) {
162
0
            return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
163
0
                    "seek pos {} is larger than total elements  {}", pos, _num_values);
164
0
        }
165
0
    }
166
    // seek past the last value is valid
167
223
    if (pos == _num_values) {
168
0
        _cur_pos = cast_set<uint32_t>(_num_values);
169
0
        return Status::OK();
170
0
    }
171
172
223
    size_t restart_point_index = pos / _restart_point_internal;
173
223
    RETURN_IF_ERROR(_seek_to_restart_point(restart_point_index));
174
652
    while (_cur_pos < pos) {
175
429
        _cur_pos++;
176
429
        RETURN_IF_ERROR(_read_next_value());
177
429
    }
178
223
    return Status::OK();
179
223
}
180
181
5.23k
Status BinaryPrefixPageDecoder::seek_at_or_after_value(const void* value, bool* exact_match) {
182
5.23k
    DCHECK(_parsed);
183
5.23k
    Slice target = *reinterpret_cast<const Slice*>(value);
184
185
5.23k
    uint32_t left = 0;
186
5.23k
    uint32_t right = _num_restarts;
187
    // find the first restart point >= target. after loop,
188
    // - left == index of first restart point >= target when found
189
    // - left == _num_restarts when not found (all restart points < target)
190
45.7k
    while (left < right) {
191
40.5k
        uint32_t mid = left + (right - left) / 2;
192
        // read first entry at restart point `mid`
193
40.5k
        RETURN_IF_ERROR(_seek_to_restart_point(mid));
194
40.5k
        Slice mid_entry(_current_value);
195
40.5k
        if (mid_entry.compare(target) < 0) {
196
19.4k
            left = mid + 1;
197
21.1k
        } else {
198
21.1k
            right = mid;
199
21.1k
        }
200
40.5k
    }
201
202
    // then linear search from the last restart pointer < target.
203
    // when left == 0, all restart points >= target, so search from first one.
204
    // otherwise search from the last restart point < target, which is left - 1
205
5.23k
    uint32_t search_index = left > 0 ? left - 1 : 0;
206
5.23k
    RETURN_IF_ERROR(_seek_to_restart_point(search_index));
207
49.1k
    while (true) {
208
49.1k
        int cmp = Slice(_current_value).compare(target);
209
49.1k
        if (cmp >= 0) {
210
5.22k
            *exact_match = cmp == 0;
211
5.22k
            return Status::OK();
212
5.22k
        }
213
43.9k
        _cur_pos++;
214
43.9k
        auto st = _read_next_value();
215
43.9k
        if (st.is<ErrorCode::END_OF_FILE>()) {
216
9
            return Status::Error<ErrorCode::ENTRY_NOT_FOUND, false>(
217
9
                    "all value small than the value");
218
9
        }
219
43.8k
        if (!st.ok()) {
220
0
            return st;
221
0
        }
222
43.8k
    }
223
5.23k
}
224
225
254
Status BinaryPrefixPageDecoder::next_batch(size_t* n, MutableColumnPtr& dst) {
226
254
    DCHECK(_parsed);
227
254
    if (*n == 0 || _cur_pos >= _num_values) [[unlikely]] {
228
0
        *n = 0;
229
0
        return Status::OK();
230
0
    }
231
254
    size_t max_fetch = std::min(*n, static_cast<size_t>(_num_values - _cur_pos));
232
233
    // read and copy values
234
133k
    for (size_t i = 0; i < max_fetch; ++i) {
235
133k
        dst->insert_data((char*)(_current_value.data()), _current_value.size());
236
133k
        _cur_pos++;
237
        // reach the end of the page, should not read the next value
238
133k
        if (_cur_pos < _num_values) {
239
133k
            RETURN_IF_ERROR(_read_next_value());
240
133k
        }
241
133k
    }
242
243
254
    *n = max_fetch;
244
254
    return Status::OK();
245
254
}
246
247
#include "common/compile_check_end.h"
248
} // namespace segment_v2
249
} // namespace doris