be/src/storage/segment/binary_prefix_page.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/segment/binary_prefix_page.h" |
19 | | |
20 | | #include <stddef.h> |
21 | | #include <stdint.h> |
22 | | |
23 | | #include <algorithm> |
24 | | #include <vector> |
25 | | |
26 | | #include "common/status.h" |
27 | | #include "util/coding.h" |
28 | | #include "util/faststring.h" |
29 | | #include "util/slice.h" |
30 | | |
31 | | namespace doris { |
32 | | namespace segment_v2 { |
33 | | #include "common/compile_check_begin.h" |
34 | | |
35 | 134k | Status BinaryPrefixPageBuilder::add(const uint8_t* vals, size_t* add_count) { |
36 | 134k | DCHECK(!_finished); |
37 | 134k | if (*add_count == 0) { |
38 | 0 | return Status::OK(); |
39 | 0 | } |
40 | | |
41 | 134k | const Slice* src = reinterpret_cast<const Slice*>(vals); |
42 | 134k | if (_count == 0) { |
43 | 239 | _first_entry.assign_copy(reinterpret_cast<const uint8_t*>(src->get_data()), |
44 | 239 | src->get_size()); |
45 | 239 | } |
46 | | |
47 | 134k | int i = 0; |
48 | 269k | for (; i < *add_count; ++i, ++src) { |
49 | 134k | if (is_page_full()) { |
50 | 0 | break; |
51 | 0 | } |
52 | 134k | const char* entry = src->data; |
53 | 134k | size_t entry_len = src->size; |
54 | 134k | size_t old_size = _buffer.size(); |
55 | | |
56 | 134k | size_t share_len; |
57 | 134k | if (_count % RESTART_POINT_INTERVAL == 0) { |
58 | 8.58k | share_len = 0; |
59 | 8.58k | _restart_points_offset.push_back(cast_set<uint32_t>(old_size)); |
60 | 126k | } else { |
61 | 126k | size_t max_share_len = std::min(_last_entry.size(), entry_len); |
62 | 126k | share_len = max_share_len; |
63 | 262k | for (int j = 0; j < max_share_len; ++j) { |
64 | 262k | if (entry[j] != _last_entry[j]) { |
65 | 126k | share_len = j; |
66 | 126k | break; |
67 | 126k | } |
68 | 262k | } |
69 | 126k | } |
70 | 134k | size_t non_share_len = entry_len - share_len; |
71 | | // This may need a large memory, should return error if could not allocated |
72 | | // successfully, to avoid BE OOM. |
73 | 134k | RETURN_IF_CATCH_EXCEPTION({ |
74 | 134k | put_varint32(&_buffer, cast_set<uint32_t>(share_len)); |
75 | 134k | put_varint32(&_buffer, cast_set<uint32_t>(non_share_len)); |
76 | 134k | _buffer.append(entry + share_len, non_share_len); |
77 | | |
78 | 134k | _last_entry.clear(); |
79 | 134k | _last_entry.append(entry, entry_len); |
80 | 134k | }); |
81 | | |
82 | 134k | _raw_data_size += entry_len; |
83 | 134k | ++_count; |
84 | 134k | } |
85 | 134k | *add_count = i; |
86 | 134k | return Status::OK(); |
87 | 134k | } |
88 | | |
89 | 239 | Status BinaryPrefixPageBuilder::finish(OwnedSlice* slice) { |
90 | 239 | DCHECK(!_finished); |
91 | 239 | _finished = true; |
92 | 239 | RETURN_IF_CATCH_EXCEPTION({ |
93 | 239 | put_fixed32_le(&_buffer, (uint32_t)_count); |
94 | 239 | uint8_t restart_point_internal = RESTART_POINT_INTERVAL; |
95 | 239 | _buffer.append(&restart_point_internal, 1); |
96 | 239 | auto restart_point_size = _restart_points_offset.size(); |
97 | 239 | for (uint32_t i = 0; i < restart_point_size; ++i) { |
98 | 239 | put_fixed32_le(&_buffer, _restart_points_offset[i]); |
99 | 239 | } |
100 | 239 | put_fixed32_le(&_buffer, cast_set<uint32_t>(restart_point_size)); |
101 | 239 | *slice = _buffer.build(); |
102 | 239 | }); |
103 | 239 | return Status::OK(); |
104 | 239 | } |
105 | | |
106 | | const uint8_t* BinaryPrefixPageDecoder::_decode_value_lengths(const uint8_t* ptr, uint32_t* shared, |
107 | 224k | uint32_t* non_shared) { |
108 | 224k | if ((ptr = decode_varint32_ptr(ptr, _footer_start, shared)) == nullptr) { |
109 | 0 | return nullptr; |
110 | 0 | } |
111 | 224k | if ((ptr = decode_varint32_ptr(ptr, _footer_start, non_shared)) == nullptr) { |
112 | 0 | return nullptr; |
113 | 0 | } |
114 | 224k | if (_footer_start - ptr < *non_shared) { |
115 | 0 | return nullptr; |
116 | 0 | } |
117 | 224k | return ptr; |
118 | 224k | } |
119 | | |
120 | 224k | Status BinaryPrefixPageDecoder::_read_next_value() { |
121 | 224k | if (_cur_pos >= _num_values) { |
122 | 9 | return Status::EndOfFile("no more value to read"); |
123 | 9 | } |
124 | 224k | uint32_t shared_len; |
125 | 224k | uint32_t non_shared_len; |
126 | 224k | auto data_ptr = _decode_value_lengths(_next_ptr, &shared_len, &non_shared_len); |
127 | 224k | if (data_ptr == nullptr) { |
128 | 0 | DCHECK(false) << "[BinaryPrefixPageDecoder::_read_next_value] corruption!"; |
129 | 0 | return Status::Corruption("Failed to decode value at position {}", _cur_pos); |
130 | 0 | } |
131 | 224k | _current_value.resize(shared_len); |
132 | 224k | _current_value.append(data_ptr, non_shared_len); |
133 | 224k | _next_ptr = data_ptr + non_shared_len; |
134 | 224k | return Status::OK(); |
135 | 224k | } |
136 | | |
137 | 46.0k | Status BinaryPrefixPageDecoder::_seek_to_restart_point(size_t restart_point_index) { |
138 | 46.0k | _cur_pos = cast_set<uint32_t>(restart_point_index * _restart_point_internal); |
139 | 46.0k | _next_ptr = _get_restart_point(restart_point_index); |
140 | 46.0k | return _read_next_value(); |
141 | 46.0k | } |
142 | | |
143 | 199 | Status BinaryPrefixPageDecoder::init() { |
144 | 199 | _cur_pos = 0; |
145 | 199 | _next_ptr = reinterpret_cast<const uint8_t*>(_data.get_data()); |
146 | | |
147 | 199 | const uint8_t* end = _next_ptr + _data.get_size(); |
148 | 199 | _num_restarts = decode_fixed32_le(end - 4); |
149 | 199 | _restarts_ptr = end - (_num_restarts + 1) * 4; |
150 | 199 | _footer_start = _restarts_ptr - 4 - 1; |
151 | 199 | _num_values = decode_fixed32_le(_footer_start); |
152 | 199 | _restart_point_internal = decode_fixed8(_footer_start + 4); |
153 | 199 | _parsed = true; |
154 | 199 | return _read_next_value(); |
155 | 199 | } |
156 | | |
157 | 223 | Status BinaryPrefixPageDecoder::seek_to_position_in_page(size_t pos) { |
158 | 223 | DCHECK(_parsed); |
159 | 223 | DCHECK_LE(pos, _num_values); |
160 | 223 | if (_num_values == 0) [[unlikely]] { |
161 | 0 | if (pos != 0) { |
162 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>( |
163 | 0 | "seek pos {} is larger than total elements {}", pos, _num_values); |
164 | 0 | } |
165 | 0 | } |
166 | | // seek past the last value is valid |
167 | 223 | if (pos == _num_values) { |
168 | 0 | _cur_pos = cast_set<uint32_t>(_num_values); |
169 | 0 | return Status::OK(); |
170 | 0 | } |
171 | | |
172 | 223 | size_t restart_point_index = pos / _restart_point_internal; |
173 | 223 | RETURN_IF_ERROR(_seek_to_restart_point(restart_point_index)); |
174 | 652 | while (_cur_pos < pos) { |
175 | 429 | _cur_pos++; |
176 | 429 | RETURN_IF_ERROR(_read_next_value()); |
177 | 429 | } |
178 | 223 | return Status::OK(); |
179 | 223 | } |
180 | | |
181 | 5.23k | Status BinaryPrefixPageDecoder::seek_at_or_after_value(const void* value, bool* exact_match) { |
182 | 5.23k | DCHECK(_parsed); |
183 | 5.23k | Slice target = *reinterpret_cast<const Slice*>(value); |
184 | | |
185 | 5.23k | uint32_t left = 0; |
186 | 5.23k | uint32_t right = _num_restarts; |
187 | | // find the first restart point >= target. after loop, |
188 | | // - left == index of first restart point >= target when found |
189 | | // - left == _num_restarts when not found (all restart points < target) |
190 | 45.7k | while (left < right) { |
191 | 40.5k | uint32_t mid = left + (right - left) / 2; |
192 | | // read first entry at restart point `mid` |
193 | 40.5k | RETURN_IF_ERROR(_seek_to_restart_point(mid)); |
194 | 40.5k | Slice mid_entry(_current_value); |
195 | 40.5k | if (mid_entry.compare(target) < 0) { |
196 | 19.4k | left = mid + 1; |
197 | 21.1k | } else { |
198 | 21.1k | right = mid; |
199 | 21.1k | } |
200 | 40.5k | } |
201 | | |
202 | | // then linear search from the last restart pointer < target. |
203 | | // when left == 0, all restart points >= target, so search from first one. |
204 | | // otherwise search from the last restart point < target, which is left - 1 |
205 | 5.23k | uint32_t search_index = left > 0 ? left - 1 : 0; |
206 | 5.23k | RETURN_IF_ERROR(_seek_to_restart_point(search_index)); |
207 | 49.1k | while (true) { |
208 | 49.1k | int cmp = Slice(_current_value).compare(target); |
209 | 49.1k | if (cmp >= 0) { |
210 | 5.22k | *exact_match = cmp == 0; |
211 | 5.22k | return Status::OK(); |
212 | 5.22k | } |
213 | 43.9k | _cur_pos++; |
214 | 43.9k | auto st = _read_next_value(); |
215 | 43.9k | if (st.is<ErrorCode::END_OF_FILE>()) { |
216 | 9 | return Status::Error<ErrorCode::ENTRY_NOT_FOUND, false>( |
217 | 9 | "all value small than the value"); |
218 | 9 | } |
219 | 43.8k | if (!st.ok()) { |
220 | 0 | return st; |
221 | 0 | } |
222 | 43.8k | } |
223 | 5.23k | } |
224 | | |
225 | 254 | Status BinaryPrefixPageDecoder::next_batch(size_t* n, MutableColumnPtr& dst) { |
226 | 254 | DCHECK(_parsed); |
227 | 254 | if (*n == 0 || _cur_pos >= _num_values) [[unlikely]] { |
228 | 0 | *n = 0; |
229 | 0 | return Status::OK(); |
230 | 0 | } |
231 | 254 | size_t max_fetch = std::min(*n, static_cast<size_t>(_num_values - _cur_pos)); |
232 | | |
233 | | // read and copy values |
234 | 133k | for (size_t i = 0; i < max_fetch; ++i) { |
235 | 133k | dst->insert_data((char*)(_current_value.data()), _current_value.size()); |
236 | 133k | _cur_pos++; |
237 | | // reach the end of the page, should not read the next value |
238 | 133k | if (_cur_pos < _num_values) { |
239 | 133k | RETURN_IF_ERROR(_read_next_value()); |
240 | 133k | } |
241 | 133k | } |
242 | | |
243 | 254 | *n = max_fetch; |
244 | 254 | return Status::OK(); |
245 | 254 | } |
246 | | |
247 | | #include "common/compile_check_end.h" |
248 | | } // namespace segment_v2 |
249 | | } // namespace doris |