be/src/util/lzo_decompressor.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include <crc32c/crc32c.h> |
19 | | |
20 | | #include "common/cast_set.h" |
21 | | #include "common/logging.h" |
22 | | #include "orc/Exceptions.hh" |
23 | | #include "storage/utils.h" |
24 | | #include "util/decompressor.h" |
25 | | |
26 | | namespace orc { |
27 | | /** |
28 | | * Decompress the bytes in to the output buffer. |
29 | | * @param inputAddress the start of the input |
30 | | * @param inputLimit one past the last byte of the input |
31 | | * @param outputAddress the start of the output buffer |
32 | | * @param outputLimit one past the last byte of the output buffer |
33 | | * @result the number of bytes decompressed |
34 | | */ |
35 | | uint64_t lzoDecompress(const char* inputAddress, const char* inputLimit, char* outputAddress, |
36 | | char* outputLimit); |
37 | | } // namespace orc |
38 | | |
39 | | namespace doris { |
40 | | #include "common/compile_check_begin.h" |
41 | | |
42 | | // Lzop |
43 | | const uint8_t LzopDecompressor::LZOP_MAGIC[9] = {0x89, 0x4c, 0x5a, 0x4f, 0x00, |
44 | | 0x0d, 0x0a, 0x1a, 0x0a}; |
45 | | |
46 | | const uint64_t LzopDecompressor::LZOP_VERSION = 0x1040; |
47 | | const uint64_t LzopDecompressor::MIN_LZO_VERSION = 0x0100; |
48 | | // magic(9) + ver(2) + lib_ver(2) + ver_needed(2) + method(1) |
49 | | // + lvl(1) + flags(4) + mode/mtime(12) + filename_len(1) |
50 | | // without the real file name, extra field and checksum |
51 | | const uint32_t LzopDecompressor::MIN_HEADER_SIZE = 34; |
52 | | const uint32_t LzopDecompressor::LZO_MAX_BLOCK_SIZE = (64 * 1024l * 1024l); |
53 | | |
54 | | const uint32_t LzopDecompressor::CRC32_INIT_VALUE = 0; |
55 | | const uint32_t LzopDecompressor::ADLER32_INIT_VALUE = 1; |
56 | | |
57 | | const uint64_t LzopDecompressor::F_H_CRC32 = 0x00001000L; |
58 | | const uint64_t LzopDecompressor::F_MASK = 0x00003FFFL; |
59 | | const uint64_t LzopDecompressor::F_OS_MASK = 0xff000000L; |
60 | | const uint64_t LzopDecompressor::F_CS_MASK = 0x00f00000L; |
61 | | const uint64_t LzopDecompressor::F_RESERVED = ((F_MASK | F_OS_MASK | F_CS_MASK) ^ 0xffffffffL); |
62 | | const uint64_t LzopDecompressor::F_MULTIPART = 0x00000400L; |
63 | | const uint64_t LzopDecompressor::F_H_FILTER = 0x00000800L; |
64 | | const uint64_t LzopDecompressor::F_H_EXTRA_FIELD = 0x00000040L; |
65 | | const uint64_t LzopDecompressor::F_CRC32_C = 0x00000200L; |
66 | | const uint64_t LzopDecompressor::F_ADLER32_C = 0x00000002L; |
67 | | const uint64_t LzopDecompressor::F_CRC32_D = 0x00000100L; |
68 | | const uint64_t LzopDecompressor::F_ADLER32_D = 0x00000001L; |
69 | | |
70 | 34 | Status LzopDecompressor::init() { |
71 | 34 | return Status::OK(); |
72 | 34 | } |
73 | | |
74 | | Status LzopDecompressor::decompress(uint8_t* input, uint32_t input_len, size_t* input_bytes_read, |
75 | | uint8_t* output, uint32_t output_max_len, |
76 | | size_t* decompressed_len, bool* stream_end, |
77 | 90 | size_t* more_input_bytes, size_t* more_output_bytes) { |
78 | 90 | if (!_is_header_loaded) { |
79 | | // this is the first time to call lzo decompress, parse the header info first |
80 | 34 | RETURN_IF_ERROR(parse_header_info(input, input_len, input_bytes_read, more_input_bytes)); |
81 | 34 | if (*more_input_bytes > 0) { |
82 | 0 | return Status::OK(); |
83 | 0 | } |
84 | 34 | } |
85 | | |
86 | | // read compressed block |
87 | | // compressed-block ::= |
88 | | // <uncompressed-size> |
89 | | // <compressed-size> |
90 | | // <uncompressed-checksums> |
91 | | // <compressed-checksums> |
92 | | // <compressed-data> |
93 | 90 | size_t left_input_len = input_len - *input_bytes_read; |
94 | 90 | if (left_input_len < sizeof(uint32_t)) { |
95 | | // block is at least have uncompressed_size |
96 | 0 | *more_input_bytes = sizeof(uint32_t) - left_input_len; |
97 | 0 | return Status::OK(); |
98 | 0 | } |
99 | | |
100 | 90 | uint8_t* block_start = input + *input_bytes_read; |
101 | 90 | uint8_t* ptr = block_start; |
102 | | // 1. uncompressed size |
103 | 90 | uint32_t uncompressed_size; |
104 | 90 | ptr = get_uint32(ptr, &uncompressed_size); |
105 | 90 | left_input_len -= sizeof(uint32_t); |
106 | 90 | if (uncompressed_size == 0) { |
107 | 42 | *input_bytes_read += sizeof(uint32_t); |
108 | 42 | *stream_end = true; |
109 | 42 | return Status::OK(); |
110 | 42 | } |
111 | | |
112 | | // 2. compressed size |
113 | 48 | if (left_input_len < sizeof(uint32_t)) { |
114 | 0 | *more_input_bytes = sizeof(uint32_t) - left_input_len; |
115 | 0 | return Status::OK(); |
116 | 0 | } |
117 | | |
118 | 48 | uint32_t compressed_size; |
119 | 48 | ptr = get_uint32(ptr, &compressed_size); |
120 | 48 | left_input_len -= sizeof(uint32_t); |
121 | 48 | if (compressed_size > LZO_MAX_BLOCK_SIZE) { |
122 | 0 | std::stringstream ss; |
123 | 0 | ss << "lzo block size: " << compressed_size |
124 | 0 | << " is greater than LZO_MAX_BLOCK_SIZE: " << LZO_MAX_BLOCK_SIZE; |
125 | 0 | return Status::InternalError(ss.str()); |
126 | 0 | } |
127 | | |
128 | | // 3. out checksum |
129 | 48 | uint32_t out_checksum = 0; |
130 | 48 | if (_header_info.output_checksum_type != CHECK_NONE) { |
131 | 48 | if (left_input_len < sizeof(uint32_t)) { |
132 | 0 | *more_input_bytes = sizeof(uint32_t) - left_input_len; |
133 | 0 | return Status::OK(); |
134 | 0 | } |
135 | | |
136 | 48 | ptr = get_uint32(ptr, &out_checksum); |
137 | 48 | left_input_len -= sizeof(uint32_t); |
138 | 48 | } |
139 | | |
140 | | // 4. in checksum |
141 | 48 | uint32_t in_checksum = 0; |
142 | 48 | if (compressed_size < uncompressed_size && _header_info.input_checksum_type != CHECK_NONE) { |
143 | 12 | if (left_input_len < sizeof(uint32_t)) { |
144 | 0 | *more_input_bytes = sizeof(uint32_t) - left_input_len; |
145 | 0 | return Status::OK(); |
146 | 0 | } |
147 | | |
148 | 12 | ptr = get_uint32(ptr, &in_checksum); |
149 | 12 | left_input_len -= sizeof(uint32_t); |
150 | 36 | } else { |
151 | | // If the compressed data size is equal to the uncompressed data size, then |
152 | | // the uncompressed data is stored and there is no compressed checksum. |
153 | 36 | in_checksum = out_checksum; |
154 | 36 | } |
155 | | |
156 | | // 5. checksum compressed data |
157 | 48 | if (left_input_len < compressed_size) { |
158 | 0 | *more_input_bytes = compressed_size - left_input_len; |
159 | 0 | return Status::OK(); |
160 | 0 | } |
161 | 48 | RETURN_IF_ERROR(checksum(_header_info.input_checksum_type, "compressed", in_checksum, ptr, |
162 | 48 | compressed_size)); |
163 | | |
164 | | // 6. decompress |
165 | 48 | if (output_max_len < uncompressed_size) { |
166 | 0 | *more_output_bytes = uncompressed_size - output_max_len; |
167 | 0 | return Status::OK(); |
168 | 0 | } |
169 | 48 | if (compressed_size == uncompressed_size) { |
170 | | // the data is uncompressed, just copy to the output buf |
171 | 36 | memmove(output, ptr, compressed_size); |
172 | 36 | ptr += compressed_size; |
173 | 36 | } else { |
174 | 12 | try { |
175 | 12 | *decompressed_len = |
176 | 12 | orc::lzoDecompress((const char*)ptr, (const char*)(ptr + compressed_size), |
177 | 12 | (char*)output, (char*)(output + uncompressed_size)); |
178 | 12 | } catch (const orc::ParseError& err) { |
179 | 0 | std::stringstream ss; |
180 | 0 | ss << "Lzo decompression failed: " << err.what(); |
181 | 0 | return Status::InternalError(ss.str()); |
182 | 0 | } |
183 | | |
184 | 12 | RETURN_IF_ERROR(checksum(_header_info.output_checksum_type, "decompressed", out_checksum, |
185 | 12 | output, *decompressed_len)); |
186 | 12 | ptr += compressed_size; |
187 | 12 | } |
188 | | |
189 | | // 7. done |
190 | 48 | *stream_end = true; |
191 | 48 | *decompressed_len = uncompressed_size; |
192 | 48 | *input_bytes_read += ptr - block_start; |
193 | | |
194 | 48 | VLOG_DEBUG << "finished decompress lzo block." |
195 | 0 | << " compressed_size: " << compressed_size |
196 | 0 | << " decompressed_len: " << *decompressed_len |
197 | 0 | << " input_bytes_read: " << *input_bytes_read; |
198 | | |
199 | 48 | return Status::OK(); |
200 | 48 | } |
201 | | |
202 | | // file-header ::= -- most of this information is not used. |
203 | | // <magic> |
204 | | // <version> |
205 | | // <lib-version> |
206 | | // [<version-needed>] -- present for all modern files. |
207 | | // <method> |
208 | | // <level> |
209 | | // <flags> |
210 | | // <mode> |
211 | | // <mtime> |
212 | | // <file-name> |
213 | | // <header-checksum> |
214 | | // <extra-field> -- presence indicated in flags, not currently used. |
215 | | Status LzopDecompressor::parse_header_info(uint8_t* input, size_t input_len, |
216 | 34 | size_t* input_bytes_read, size_t* more_input_bytes) { |
217 | 34 | if (input_len < MIN_HEADER_SIZE) { |
218 | 0 | VLOG_NOTICE << "highly recommanded that Lzo header size is larger than " << MIN_HEADER_SIZE |
219 | 0 | << ", or parsing header info may failed." |
220 | 0 | << " only given: " << input_len; |
221 | 0 | *more_input_bytes = MIN_HEADER_SIZE - input_len; |
222 | 0 | return Status::OK(); |
223 | 0 | } |
224 | | |
225 | 34 | uint8_t* ptr = input; |
226 | | // 1. magic |
227 | 34 | if (memcmp(ptr, LZOP_MAGIC, sizeof(LZOP_MAGIC))) { |
228 | 0 | std::stringstream ss; |
229 | 0 | ss << "invalid lzo magic number"; |
230 | 0 | return Status::InternalError(ss.str()); |
231 | 0 | } |
232 | 34 | ptr += sizeof(LZOP_MAGIC); |
233 | 34 | uint8_t* header = ptr; |
234 | | |
235 | | // 2. version |
236 | 34 | ptr = get_uint16(ptr, &_header_info.version); |
237 | 34 | if (_header_info.version > LZOP_VERSION) { |
238 | 0 | std::stringstream ss; |
239 | 0 | ss << "compressed with later version of lzop: " << &_header_info.version |
240 | 0 | << " must be less than: " << LZOP_VERSION; |
241 | 0 | return Status::InternalError(ss.str()); |
242 | 0 | } |
243 | | |
244 | | // 3. lib version |
245 | 34 | ptr = get_uint16(ptr, &_header_info.lib_version); |
246 | 34 | if (_header_info.lib_version < MIN_LZO_VERSION) { |
247 | 0 | std::stringstream ss; |
248 | 0 | ss << "compressed with incompatible lzo version: " << &_header_info.lib_version |
249 | 0 | << "must be at least: " << MIN_LZO_VERSION; |
250 | 0 | return Status::InternalError(ss.str()); |
251 | 0 | } |
252 | | |
253 | | // 4. version needed |
254 | 34 | ptr = get_uint16(ptr, &_header_info.version_needed); |
255 | 34 | if (_header_info.version_needed > LZOP_VERSION) { |
256 | 0 | std::stringstream ss; |
257 | 0 | ss << "compressed with imp incompatible lzo version: " << &_header_info.version |
258 | 0 | << " must be at no more than: " << LZOP_VERSION; |
259 | 0 | return Status::InternalError(ss.str()); |
260 | 0 | } |
261 | | |
262 | | // 5. method |
263 | 34 | ptr = get_uint8(ptr, &_header_info.method); |
264 | 34 | if (_header_info.method < 1 || _header_info.method > 3) { |
265 | 0 | std::stringstream ss; |
266 | 0 | ss << "invalid compression method: " << _header_info.method; |
267 | 0 | return Status::InternalError(ss.str()); |
268 | 0 | } |
269 | | |
270 | | // 6. unsupported level: 7, 8, 9 |
271 | 34 | uint8_t level; |
272 | 34 | ptr = get_uint8(ptr, &level); |
273 | 34 | if (level > 6) { |
274 | 0 | std::stringstream ss; |
275 | 0 | ss << "unsupported lzo level: " << level; |
276 | 0 | return Status::InternalError(ss.str()); |
277 | 0 | } |
278 | | |
279 | | // 7. flags |
280 | 34 | uint32_t flags; |
281 | 34 | ptr = get_uint32(ptr, &flags); |
282 | 34 | if (flags & (F_RESERVED | F_MULTIPART | F_H_FILTER)) { |
283 | 0 | std::stringstream ss; |
284 | 0 | ss << "unsupported lzo flags: " << flags; |
285 | 0 | return Status::InternalError(ss.str()); |
286 | 0 | } |
287 | 34 | _header_info.header_checksum_type = header_type(flags); |
288 | 34 | _header_info.input_checksum_type = input_type(flags); |
289 | 34 | _header_info.output_checksum_type = output_type(flags); |
290 | | |
291 | | // 8. skip mode and mtime |
292 | 34 | ptr += 3 * sizeof(int32_t); |
293 | | |
294 | | // 9. filename |
295 | 34 | uint8_t filename_len; |
296 | 34 | ptr = get_uint8(ptr, &filename_len); |
297 | | |
298 | | // here we already consume (MIN_HEADER_SIZE) |
299 | | // from now we have to check left input is enough for each step |
300 | 34 | size_t left = input_len - (ptr - input); |
301 | 34 | if (left < filename_len) { |
302 | 0 | *more_input_bytes = filename_len - left; |
303 | 0 | return Status::OK(); |
304 | 0 | } |
305 | | |
306 | 34 | _header_info.filename = std::string((char*)ptr, (size_t)filename_len); |
307 | 34 | ptr += filename_len; |
308 | 34 | left -= filename_len; |
309 | | |
310 | | // 10. checksum |
311 | 34 | if (left < sizeof(uint32_t)) { |
312 | 0 | *more_input_bytes = sizeof(uint32_t) - left; |
313 | 0 | return Status::OK(); |
314 | 0 | } |
315 | 34 | uint32_t expected_checksum; |
316 | 34 | uint8_t* cur = ptr; |
317 | 34 | ptr = get_uint32(ptr, &expected_checksum); |
318 | 34 | uint32_t computed_checksum; |
319 | 34 | if (_header_info.header_checksum_type == CHECK_CRC32) { |
320 | 0 | computed_checksum = CRC32_INIT_VALUE; |
321 | 0 | computed_checksum = crc32c::Extend(computed_checksum, (const uint8_t*)header, cur - header); |
322 | 34 | } else { |
323 | 34 | computed_checksum = ADLER32_INIT_VALUE; |
324 | 34 | computed_checksum = olap_adler32(computed_checksum, (const char*)header, cur - header); |
325 | 34 | } |
326 | | |
327 | 34 | if (computed_checksum != expected_checksum) { |
328 | 0 | std::stringstream ss; |
329 | 0 | ss << "invalid header checksum: " << computed_checksum |
330 | 0 | << " expected: " << expected_checksum; |
331 | 0 | return Status::InternalError(ss.str()); |
332 | 0 | } |
333 | 34 | left -= sizeof(uint32_t); |
334 | | |
335 | | // 11. skip extra |
336 | 34 | if (flags & F_H_EXTRA_FIELD) { |
337 | 0 | if (left < sizeof(uint32_t)) { |
338 | 0 | *more_input_bytes = sizeof(uint32_t) - left; |
339 | 0 | return Status::OK(); |
340 | 0 | } |
341 | 0 | uint32_t extra_len; |
342 | 0 | ptr = get_uint32(ptr, &extra_len); |
343 | 0 | left -= sizeof(uint32_t); |
344 | | |
345 | | // add the checksum and the len to the total ptr size. |
346 | 0 | if (left < sizeof(int32_t) + extra_len) { |
347 | 0 | *more_input_bytes = sizeof(int32_t) + extra_len - left; |
348 | 0 | return Status::OK(); |
349 | 0 | } |
350 | 0 | left -= sizeof(int32_t) + extra_len; |
351 | 0 | ptr += sizeof(int32_t) + extra_len; |
352 | 0 | } |
353 | | |
354 | 34 | _header_info.header_size = cast_set<int32_t>(ptr - input); |
355 | 34 | *input_bytes_read = _header_info.header_size; |
356 | | |
357 | 34 | _is_header_loaded = true; |
358 | 34 | VLOG_DEBUG << debug_info(); |
359 | | |
360 | 34 | return Status::OK(); |
361 | 34 | } |
362 | | |
363 | | Status LzopDecompressor::checksum(LzoChecksum type, const std::string& source, uint32_t expected, |
364 | 60 | uint8_t* ptr, size_t len) { |
365 | 60 | uint32_t computed_checksum; |
366 | 60 | switch (type) { |
367 | 0 | case CHECK_NONE: |
368 | 0 | return Status::OK(); |
369 | 0 | case CHECK_CRC32: |
370 | 0 | computed_checksum = crc32c::Extend(CRC32_INIT_VALUE, (const uint8_t*)ptr, len); |
371 | 0 | break; |
372 | 60 | case CHECK_ADLER: |
373 | 60 | computed_checksum = olap_adler32(ADLER32_INIT_VALUE, (const char*)ptr, len); |
374 | 60 | break; |
375 | 0 | default: |
376 | 0 | std::stringstream ss; |
377 | 0 | ss << "Invalid checksum type: " << type; |
378 | 0 | return Status::InternalError(ss.str()); |
379 | 60 | } |
380 | | |
381 | 60 | if (computed_checksum != expected) { |
382 | 0 | std::stringstream ss; |
383 | 0 | ss << "checksum of " << source << " block failed." |
384 | 0 | << " computed checksum: " << computed_checksum << " expected: " << expected; |
385 | 0 | return Status::InternalError(ss.str()); |
386 | 0 | } |
387 | | |
388 | 60 | return Status::OK(); |
389 | 60 | } |
390 | | |
391 | 0 | std::string LzopDecompressor::debug_info() { |
392 | 0 | std::stringstream ss; |
393 | 0 | ss << "LzopDecompressor." |
394 | 0 | << " version: " << _header_info.version << " lib version: " << _header_info.lib_version |
395 | 0 | << " version needed: " << _header_info.version_needed |
396 | 0 | << " method: " << (uint16_t)_header_info.method << " filename: " << _header_info.filename |
397 | 0 | << " header size: " << _header_info.header_size |
398 | 0 | << " header checksum type: " << _header_info.header_checksum_type |
399 | 0 | << " input checksum type: " << _header_info.input_checksum_type |
400 | 0 | << " output checksum type: " << _header_info.output_checksum_type; |
401 | 0 | return ss.str(); |
402 | 0 | } |
403 | | |
404 | | #include "common/compile_check_end.h" |
405 | | } // namespace doris |