be/src/storage/segment/binary_prefix_page.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/segment/binary_prefix_page.h" |
19 | | |
20 | | #include <stddef.h> |
21 | | #include <stdint.h> |
22 | | |
23 | | #include <algorithm> |
24 | | #include <vector> |
25 | | |
26 | | #include "common/status.h" |
27 | | #include "util/coding.h" |
28 | | #include "util/faststring.h" |
29 | | #include "util/slice.h" |
30 | | |
31 | | namespace doris { |
32 | | namespace segment_v2 { |
33 | | |
34 | 134k | Status BinaryPrefixPageBuilder::add(const uint8_t* vals, size_t* add_count) { |
35 | 134k | DCHECK(!_finished); |
36 | 134k | if (*add_count == 0) { |
37 | 0 | return Status::OK(); |
38 | 0 | } |
39 | | |
40 | 134k | const Slice* src = reinterpret_cast<const Slice*>(vals); |
41 | | |
42 | 134k | int i = 0; |
43 | 269k | for (; i < *add_count; ++i, ++src) { |
44 | 134k | if (is_page_full()) { |
45 | 0 | break; |
46 | 0 | } |
47 | 134k | const char* entry = src->data; |
48 | 134k | size_t entry_len = src->size; |
49 | 134k | size_t old_size = _buffer.size(); |
50 | | |
51 | 134k | size_t share_len; |
52 | 134k | if (_count % RESTART_POINT_INTERVAL == 0) { |
53 | 8.58k | share_len = 0; |
54 | 8.58k | _restart_points_offset.push_back(cast_set<uint32_t>(old_size)); |
55 | 126k | } else { |
56 | 126k | size_t max_share_len = std::min(_last_entry.size(), entry_len); |
57 | 126k | share_len = max_share_len; |
58 | 262k | for (int j = 0; j < max_share_len; ++j) { |
59 | 262k | if (entry[j] != _last_entry[j]) { |
60 | 126k | share_len = j; |
61 | 126k | break; |
62 | 126k | } |
63 | 262k | } |
64 | 126k | } |
65 | 134k | size_t non_share_len = entry_len - share_len; |
66 | | // This may need a large memory, should return error if could not allocated |
67 | | // successfully, to avoid BE OOM. |
68 | 134k | RETURN_IF_CATCH_EXCEPTION({ |
69 | 134k | put_varint32(&_buffer, cast_set<uint32_t>(share_len)); |
70 | 134k | put_varint32(&_buffer, cast_set<uint32_t>(non_share_len)); |
71 | 134k | _buffer.append(entry + share_len, non_share_len); |
72 | | |
73 | 134k | _last_entry.clear(); |
74 | 134k | _last_entry.append(entry, entry_len); |
75 | 134k | }); |
76 | | |
77 | 134k | _raw_data_size += entry_len; |
78 | 134k | ++_count; |
79 | 134k | } |
80 | 134k | *add_count = i; |
81 | 134k | return Status::OK(); |
82 | 134k | } |
83 | | |
84 | 243 | Status BinaryPrefixPageBuilder::finish(OwnedSlice* slice) { |
85 | 243 | DCHECK(!_finished); |
86 | 243 | _finished = true; |
87 | 243 | RETURN_IF_CATCH_EXCEPTION({ |
88 | 243 | put_fixed32_le(&_buffer, (uint32_t)_count); |
89 | 243 | uint8_t restart_point_internal = RESTART_POINT_INTERVAL; |
90 | 243 | _buffer.append(&restart_point_internal, 1); |
91 | 243 | auto restart_point_size = _restart_points_offset.size(); |
92 | 243 | for (uint32_t i = 0; i < restart_point_size; ++i) { |
93 | 243 | put_fixed32_le(&_buffer, _restart_points_offset[i]); |
94 | 243 | } |
95 | 243 | put_fixed32_le(&_buffer, cast_set<uint32_t>(restart_point_size)); |
96 | 243 | *slice = _buffer.build(); |
97 | 243 | }); |
98 | 243 | return Status::OK(); |
99 | 243 | } |
100 | | |
101 | | const uint8_t* BinaryPrefixPageDecoder::_decode_value_lengths(const uint8_t* ptr, uint32_t* shared, |
102 | 224k | uint32_t* non_shared) { |
103 | 224k | if ((ptr = decode_varint32_ptr(ptr, _footer_start, shared)) == nullptr) { |
104 | 0 | return nullptr; |
105 | 0 | } |
106 | 224k | if ((ptr = decode_varint32_ptr(ptr, _footer_start, non_shared)) == nullptr) { |
107 | 0 | return nullptr; |
108 | 0 | } |
109 | 224k | if (_footer_start - ptr < *non_shared) { |
110 | 0 | return nullptr; |
111 | 0 | } |
112 | 224k | return ptr; |
113 | 224k | } |
114 | | |
115 | 224k | Status BinaryPrefixPageDecoder::_read_next_value() { |
116 | 224k | if (_cur_pos >= _num_values) { |
117 | 9 | return Status::EndOfFile("no more value to read"); |
118 | 9 | } |
119 | 224k | uint32_t shared_len; |
120 | 224k | uint32_t non_shared_len; |
121 | 224k | auto data_ptr = _decode_value_lengths(_next_ptr, &shared_len, &non_shared_len); |
122 | 224k | if (data_ptr == nullptr) { |
123 | 0 | DCHECK(false) << "[BinaryPrefixPageDecoder::_read_next_value] corruption!"; |
124 | 0 | return Status::Corruption("Failed to decode value at position {}", _cur_pos); |
125 | 0 | } |
126 | 224k | _current_value.resize(shared_len); |
127 | 224k | _current_value.append(data_ptr, non_shared_len); |
128 | 224k | _next_ptr = data_ptr + non_shared_len; |
129 | 224k | return Status::OK(); |
130 | 224k | } |
131 | | |
132 | 46.0k | Status BinaryPrefixPageDecoder::_seek_to_restart_point(size_t restart_point_index) { |
133 | 46.0k | _cur_pos = cast_set<uint32_t>(restart_point_index * _restart_point_internal); |
134 | 46.0k | _next_ptr = _get_restart_point(restart_point_index); |
135 | 46.0k | return _read_next_value(); |
136 | 46.0k | } |
137 | | |
138 | 203 | Status BinaryPrefixPageDecoder::init() { |
139 | 203 | _cur_pos = 0; |
140 | 203 | _next_ptr = reinterpret_cast<const uint8_t*>(_data.get_data()); |
141 | | |
142 | 203 | const uint8_t* end = _next_ptr + _data.get_size(); |
143 | 203 | _num_restarts = decode_fixed32_le(end - 4); |
144 | 203 | _restarts_ptr = end - (_num_restarts + 1) * 4; |
145 | 203 | _footer_start = _restarts_ptr - 4 - 1; |
146 | 203 | _num_values = decode_fixed32_le(_footer_start); |
147 | 203 | _restart_point_internal = decode_fixed8(_footer_start + 4); |
148 | 203 | _parsed = true; |
149 | 203 | return _read_next_value(); |
150 | 203 | } |
151 | | |
152 | 227 | Status BinaryPrefixPageDecoder::seek_to_position_in_page(size_t pos) { |
153 | 227 | DCHECK(_parsed); |
154 | 227 | DCHECK_LE(pos, _num_values); |
155 | 227 | if (_num_values == 0) [[unlikely]] { |
156 | 0 | if (pos != 0) { |
157 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>( |
158 | 0 | "seek pos {} is larger than total elements {}", pos, _num_values); |
159 | 0 | } |
160 | 0 | } |
161 | | // seek past the last value is valid |
162 | 227 | if (pos == _num_values) { |
163 | 0 | _cur_pos = cast_set<uint32_t>(_num_values); |
164 | 0 | return Status::OK(); |
165 | 0 | } |
166 | | |
167 | 227 | size_t restart_point_index = pos / _restart_point_internal; |
168 | 227 | RETURN_IF_ERROR(_seek_to_restart_point(restart_point_index)); |
169 | 656 | while (_cur_pos < pos) { |
170 | 429 | _cur_pos++; |
171 | 429 | RETURN_IF_ERROR(_read_next_value()); |
172 | 429 | } |
173 | 227 | return Status::OK(); |
174 | 227 | } |
175 | | |
176 | 5.23k | Status BinaryPrefixPageDecoder::seek_at_or_after_value(const void* value, bool* exact_match) { |
177 | 5.23k | DCHECK(_parsed); |
178 | 5.23k | Slice target = *reinterpret_cast<const Slice*>(value); |
179 | | |
180 | 5.23k | uint32_t left = 0; |
181 | 5.23k | uint32_t right = _num_restarts; |
182 | | // find the first restart point >= target. after loop, |
183 | | // - left == index of first restart point >= target when found |
184 | | // - left == _num_restarts when not found (all restart points < target) |
185 | 45.7k | while (left < right) { |
186 | 40.5k | uint32_t mid = left + (right - left) / 2; |
187 | | // read first entry at restart point `mid` |
188 | 40.5k | RETURN_IF_ERROR(_seek_to_restart_point(mid)); |
189 | 40.5k | Slice mid_entry(_current_value); |
190 | 40.5k | if (mid_entry.compare(target) < 0) { |
191 | 19.4k | left = mid + 1; |
192 | 21.1k | } else { |
193 | 21.1k | right = mid; |
194 | 21.1k | } |
195 | 40.5k | } |
196 | | |
197 | | // then linear search from the last restart pointer < target. |
198 | | // when left == 0, all restart points >= target, so search from first one. |
199 | | // otherwise search from the last restart point < target, which is left - 1 |
200 | 5.23k | uint32_t search_index = left > 0 ? left - 1 : 0; |
201 | 5.23k | RETURN_IF_ERROR(_seek_to_restart_point(search_index)); |
202 | 49.1k | while (true) { |
203 | 49.1k | int cmp = Slice(_current_value).compare(target); |
204 | 49.1k | if (cmp >= 0) { |
205 | 5.22k | *exact_match = cmp == 0; |
206 | 5.22k | return Status::OK(); |
207 | 5.22k | } |
208 | 43.9k | _cur_pos++; |
209 | 43.9k | auto st = _read_next_value(); |
210 | 43.9k | if (st.is<ErrorCode::END_OF_FILE>()) { |
211 | 9 | return Status::Error<ErrorCode::ENTRY_NOT_FOUND, false>( |
212 | 9 | "all value small than the value"); |
213 | 9 | } |
214 | 43.8k | if (!st.ok()) { |
215 | 0 | return st; |
216 | 0 | } |
217 | 43.8k | } |
218 | 5.23k | } |
219 | | |
220 | 258 | Status BinaryPrefixPageDecoder::next_batch(size_t* n, MutableColumnPtr& dst) { |
221 | 258 | DCHECK(_parsed); |
222 | 258 | if (*n == 0 || _cur_pos >= _num_values) [[unlikely]] { |
223 | 0 | *n = 0; |
224 | 0 | return Status::OK(); |
225 | 0 | } |
226 | 258 | size_t max_fetch = std::min(*n, static_cast<size_t>(_num_values - _cur_pos)); |
227 | | |
228 | | // read and copy values |
229 | 133k | for (size_t i = 0; i < max_fetch; ++i) { |
230 | 133k | dst->insert_data((char*)(_current_value.data()), _current_value.size()); |
231 | 133k | _cur_pos++; |
232 | | // reach the end of the page, should not read the next value |
233 | 133k | if (_cur_pos < _num_values) { |
234 | 133k | RETURN_IF_ERROR(_read_next_value()); |
235 | 133k | } |
236 | 133k | } |
237 | | |
238 | 258 | *n = max_fetch; |
239 | 258 | return Status::OK(); |
240 | 258 | } |
241 | | |
242 | | } // namespace segment_v2 |
243 | | } // namespace doris |