be/src/storage/segment/binary_prefix_page.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/segment/binary_prefix_page.h" |
19 | | |
20 | | #include <stddef.h> |
21 | | #include <stdint.h> |
22 | | |
23 | | #include <algorithm> |
24 | | #include <vector> |
25 | | |
26 | | #include "common/status.h" |
27 | | #include "util/coding.h" |
28 | | #include "util/faststring.h" |
29 | | #include "util/slice.h" |
30 | | |
31 | | namespace doris { |
32 | | namespace segment_v2 { |
33 | | |
34 | 134k | Status BinaryPrefixPageBuilder::add(const uint8_t* vals, size_t* add_count) { |
35 | 134k | DCHECK(!_finished); |
36 | 134k | if (*add_count == 0) { |
37 | 0 | return Status::OK(); |
38 | 0 | } |
39 | | |
40 | 134k | const Slice* src = reinterpret_cast<const Slice*>(vals); |
41 | 134k | if (_count == 0) { |
42 | 243 | _first_entry.assign_copy(reinterpret_cast<const uint8_t*>(src->get_data()), |
43 | 243 | src->get_size()); |
44 | 243 | } |
45 | | |
46 | 134k | int i = 0; |
47 | 269k | for (; i < *add_count; ++i, ++src) { |
48 | 134k | if (is_page_full()) { |
49 | 0 | break; |
50 | 0 | } |
51 | 134k | const char* entry = src->data; |
52 | 134k | size_t entry_len = src->size; |
53 | 134k | size_t old_size = _buffer.size(); |
54 | | |
55 | 134k | size_t share_len; |
56 | 134k | if (_count % RESTART_POINT_INTERVAL == 0) { |
57 | 8.58k | share_len = 0; |
58 | 8.58k | _restart_points_offset.push_back(cast_set<uint32_t>(old_size)); |
59 | 126k | } else { |
60 | 126k | size_t max_share_len = std::min(_last_entry.size(), entry_len); |
61 | 126k | share_len = max_share_len; |
62 | 262k | for (int j = 0; j < max_share_len; ++j) { |
63 | 262k | if (entry[j] != _last_entry[j]) { |
64 | 126k | share_len = j; |
65 | 126k | break; |
66 | 126k | } |
67 | 262k | } |
68 | 126k | } |
69 | 134k | size_t non_share_len = entry_len - share_len; |
70 | | // This may need a large memory, should return error if could not allocated |
71 | | // successfully, to avoid BE OOM. |
72 | 134k | RETURN_IF_CATCH_EXCEPTION({ |
73 | 134k | put_varint32(&_buffer, cast_set<uint32_t>(share_len)); |
74 | 134k | put_varint32(&_buffer, cast_set<uint32_t>(non_share_len)); |
75 | 134k | _buffer.append(entry + share_len, non_share_len); |
76 | | |
77 | 134k | _last_entry.clear(); |
78 | 134k | _last_entry.append(entry, entry_len); |
79 | 134k | }); |
80 | | |
81 | 134k | _raw_data_size += entry_len; |
82 | 134k | ++_count; |
83 | 134k | } |
84 | 134k | *add_count = i; |
85 | 134k | return Status::OK(); |
86 | 134k | } |
87 | | |
88 | 243 | Status BinaryPrefixPageBuilder::finish(OwnedSlice* slice) { |
89 | 243 | DCHECK(!_finished); |
90 | 243 | _finished = true; |
91 | 243 | RETURN_IF_CATCH_EXCEPTION({ |
92 | 243 | put_fixed32_le(&_buffer, (uint32_t)_count); |
93 | 243 | uint8_t restart_point_internal = RESTART_POINT_INTERVAL; |
94 | 243 | _buffer.append(&restart_point_internal, 1); |
95 | 243 | auto restart_point_size = _restart_points_offset.size(); |
96 | 243 | for (uint32_t i = 0; i < restart_point_size; ++i) { |
97 | 243 | put_fixed32_le(&_buffer, _restart_points_offset[i]); |
98 | 243 | } |
99 | 243 | put_fixed32_le(&_buffer, cast_set<uint32_t>(restart_point_size)); |
100 | 243 | *slice = _buffer.build(); |
101 | 243 | }); |
102 | 243 | return Status::OK(); |
103 | 243 | } |
104 | | |
105 | | const uint8_t* BinaryPrefixPageDecoder::_decode_value_lengths(const uint8_t* ptr, uint32_t* shared, |
106 | 224k | uint32_t* non_shared) { |
107 | 224k | if ((ptr = decode_varint32_ptr(ptr, _footer_start, shared)) == nullptr) { |
108 | 0 | return nullptr; |
109 | 0 | } |
110 | 224k | if ((ptr = decode_varint32_ptr(ptr, _footer_start, non_shared)) == nullptr) { |
111 | 0 | return nullptr; |
112 | 0 | } |
113 | 224k | if (_footer_start - ptr < *non_shared) { |
114 | 0 | return nullptr; |
115 | 0 | } |
116 | 224k | return ptr; |
117 | 224k | } |
118 | | |
119 | 224k | Status BinaryPrefixPageDecoder::_read_next_value() { |
120 | 224k | if (_cur_pos >= _num_values) { |
121 | 9 | return Status::EndOfFile("no more value to read"); |
122 | 9 | } |
123 | 224k | uint32_t shared_len; |
124 | 224k | uint32_t non_shared_len; |
125 | 224k | auto data_ptr = _decode_value_lengths(_next_ptr, &shared_len, &non_shared_len); |
126 | 224k | if (data_ptr == nullptr) { |
127 | 0 | DCHECK(false) << "[BinaryPrefixPageDecoder::_read_next_value] corruption!"; |
128 | 0 | return Status::Corruption("Failed to decode value at position {}", _cur_pos); |
129 | 0 | } |
130 | 224k | _current_value.resize(shared_len); |
131 | 224k | _current_value.append(data_ptr, non_shared_len); |
132 | 224k | _next_ptr = data_ptr + non_shared_len; |
133 | 224k | return Status::OK(); |
134 | 224k | } |
135 | | |
136 | 46.0k | Status BinaryPrefixPageDecoder::_seek_to_restart_point(size_t restart_point_index) { |
137 | 46.0k | _cur_pos = cast_set<uint32_t>(restart_point_index * _restart_point_internal); |
138 | 46.0k | _next_ptr = _get_restart_point(restart_point_index); |
139 | 46.0k | return _read_next_value(); |
140 | 46.0k | } |
141 | | |
142 | 203 | Status BinaryPrefixPageDecoder::init() { |
143 | 203 | _cur_pos = 0; |
144 | 203 | _next_ptr = reinterpret_cast<const uint8_t*>(_data.get_data()); |
145 | | |
146 | 203 | const uint8_t* end = _next_ptr + _data.get_size(); |
147 | 203 | _num_restarts = decode_fixed32_le(end - 4); |
148 | 203 | _restarts_ptr = end - (_num_restarts + 1) * 4; |
149 | 203 | _footer_start = _restarts_ptr - 4 - 1; |
150 | 203 | _num_values = decode_fixed32_le(_footer_start); |
151 | 203 | _restart_point_internal = decode_fixed8(_footer_start + 4); |
152 | 203 | _parsed = true; |
153 | 203 | return _read_next_value(); |
154 | 203 | } |
155 | | |
156 | 227 | Status BinaryPrefixPageDecoder::seek_to_position_in_page(size_t pos) { |
157 | 227 | DCHECK(_parsed); |
158 | 227 | DCHECK_LE(pos, _num_values); |
159 | 227 | if (_num_values == 0) [[unlikely]] { |
160 | 0 | if (pos != 0) { |
161 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR, false>( |
162 | 0 | "seek pos {} is larger than total elements {}", pos, _num_values); |
163 | 0 | } |
164 | 0 | } |
165 | | // seek past the last value is valid |
166 | 227 | if (pos == _num_values) { |
167 | 0 | _cur_pos = cast_set<uint32_t>(_num_values); |
168 | 0 | return Status::OK(); |
169 | 0 | } |
170 | | |
171 | 227 | size_t restart_point_index = pos / _restart_point_internal; |
172 | 227 | RETURN_IF_ERROR(_seek_to_restart_point(restart_point_index)); |
173 | 656 | while (_cur_pos < pos) { |
174 | 429 | _cur_pos++; |
175 | 429 | RETURN_IF_ERROR(_read_next_value()); |
176 | 429 | } |
177 | 227 | return Status::OK(); |
178 | 227 | } |
179 | | |
180 | 5.23k | Status BinaryPrefixPageDecoder::seek_at_or_after_value(const void* value, bool* exact_match) { |
181 | 5.23k | DCHECK(_parsed); |
182 | 5.23k | Slice target = *reinterpret_cast<const Slice*>(value); |
183 | | |
184 | 5.23k | uint32_t left = 0; |
185 | 5.23k | uint32_t right = _num_restarts; |
186 | | // find the first restart point >= target. after loop, |
187 | | // - left == index of first restart point >= target when found |
188 | | // - left == _num_restarts when not found (all restart points < target) |
189 | 45.7k | while (left < right) { |
190 | 40.5k | uint32_t mid = left + (right - left) / 2; |
191 | | // read first entry at restart point `mid` |
192 | 40.5k | RETURN_IF_ERROR(_seek_to_restart_point(mid)); |
193 | 40.5k | Slice mid_entry(_current_value); |
194 | 40.5k | if (mid_entry.compare(target) < 0) { |
195 | 19.4k | left = mid + 1; |
196 | 21.1k | } else { |
197 | 21.1k | right = mid; |
198 | 21.1k | } |
199 | 40.5k | } |
200 | | |
201 | | // then linear search from the last restart pointer < target. |
202 | | // when left == 0, all restart points >= target, so search from first one. |
203 | | // otherwise search from the last restart point < target, which is left - 1 |
204 | 5.23k | uint32_t search_index = left > 0 ? left - 1 : 0; |
205 | 5.23k | RETURN_IF_ERROR(_seek_to_restart_point(search_index)); |
206 | 49.1k | while (true) { |
207 | 49.1k | int cmp = Slice(_current_value).compare(target); |
208 | 49.1k | if (cmp >= 0) { |
209 | 5.22k | *exact_match = cmp == 0; |
210 | 5.22k | return Status::OK(); |
211 | 5.22k | } |
212 | 43.9k | _cur_pos++; |
213 | 43.9k | auto st = _read_next_value(); |
214 | 43.9k | if (st.is<ErrorCode::END_OF_FILE>()) { |
215 | 9 | return Status::Error<ErrorCode::ENTRY_NOT_FOUND, false>( |
216 | 9 | "all value small than the value"); |
217 | 9 | } |
218 | 43.8k | if (!st.ok()) { |
219 | 0 | return st; |
220 | 0 | } |
221 | 43.8k | } |
222 | 5.23k | } |
223 | | |
224 | 258 | Status BinaryPrefixPageDecoder::next_batch(size_t* n, MutableColumnPtr& dst) { |
225 | 258 | DCHECK(_parsed); |
226 | 258 | if (*n == 0 || _cur_pos >= _num_values) [[unlikely]] { |
227 | 0 | *n = 0; |
228 | 0 | return Status::OK(); |
229 | 0 | } |
230 | 258 | size_t max_fetch = std::min(*n, static_cast<size_t>(_num_values - _cur_pos)); |
231 | | |
232 | | // read and copy values |
233 | 133k | for (size_t i = 0; i < max_fetch; ++i) { |
234 | 133k | dst->insert_data((char*)(_current_value.data()), _current_value.size()); |
235 | 133k | _cur_pos++; |
236 | | // reach the end of the page, should not read the next value |
237 | 133k | if (_cur_pos < _num_values) { |
238 | 133k | RETURN_IF_ERROR(_read_next_value()); |
239 | 133k | } |
240 | 133k | } |
241 | | |
242 | 258 | *n = max_fetch; |
243 | 258 | return Status::OK(); |
244 | 258 | } |
245 | | |
246 | | } // namespace segment_v2 |
247 | | } // namespace doris |