Coverage Report

Created: 2026-04-14 12:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/lzo_decompressor.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <crc32c/crc32c.h>
19
20
#include "common/cast_set.h"
21
#include "common/logging.h"
22
#include "orc/Exceptions.hh"
23
#include "storage/utils.h"
24
#include "util/decompressor.h"
25
26
namespace orc {
27
/**
28
 * Decompress the bytes in to the output buffer.
29
 * @param inputAddress the start of the input
30
 * @param inputLimit one past the last byte of the input
31
 * @param outputAddress the start of the output buffer
32
 * @param outputLimit one past the last byte of the output buffer
33
 * @result the number of bytes decompressed
34
 */
35
uint64_t lzoDecompress(const char* inputAddress, const char* inputLimit, char* outputAddress,
36
                       char* outputLimit);
37
} // namespace orc
38
39
namespace doris {
40
41
// Lzop
42
const uint8_t LzopDecompressor::LZOP_MAGIC[9] = {0x89, 0x4c, 0x5a, 0x4f, 0x00,
43
                                                 0x0d, 0x0a, 0x1a, 0x0a};
44
45
const uint64_t LzopDecompressor::LZOP_VERSION = 0x1040;
46
const uint64_t LzopDecompressor::MIN_LZO_VERSION = 0x0100;
47
// magic(9) + ver(2) + lib_ver(2) + ver_needed(2) + method(1)
48
// + lvl(1) + flags(4) + mode/mtime(12) + filename_len(1)
49
// without the real file name, extra field and checksum
50
const uint32_t LzopDecompressor::MIN_HEADER_SIZE = 34;
51
const uint32_t LzopDecompressor::LZO_MAX_BLOCK_SIZE = (64 * 1024l * 1024l);
52
53
const uint32_t LzopDecompressor::CRC32_INIT_VALUE = 0;
54
const uint32_t LzopDecompressor::ADLER32_INIT_VALUE = 1;
55
56
const uint64_t LzopDecompressor::F_H_CRC32 = 0x00001000L;
57
const uint64_t LzopDecompressor::F_MASK = 0x00003FFFL;
58
const uint64_t LzopDecompressor::F_OS_MASK = 0xff000000L;
59
const uint64_t LzopDecompressor::F_CS_MASK = 0x00f00000L;
60
const uint64_t LzopDecompressor::F_RESERVED = ((F_MASK | F_OS_MASK | F_CS_MASK) ^ 0xffffffffL);
61
const uint64_t LzopDecompressor::F_MULTIPART = 0x00000400L;
62
const uint64_t LzopDecompressor::F_H_FILTER = 0x00000800L;
63
const uint64_t LzopDecompressor::F_H_EXTRA_FIELD = 0x00000040L;
64
const uint64_t LzopDecompressor::F_CRC32_C = 0x00000200L;
65
const uint64_t LzopDecompressor::F_ADLER32_C = 0x00000002L;
66
const uint64_t LzopDecompressor::F_CRC32_D = 0x00000100L;
67
const uint64_t LzopDecompressor::F_ADLER32_D = 0x00000001L;
68
69
37
Status LzopDecompressor::init() {
70
37
    return Status::OK();
71
37
}
72
73
Status LzopDecompressor::decompress(uint8_t* input, uint32_t input_len, size_t* input_bytes_read,
74
                                    uint8_t* output, uint32_t output_max_len,
75
                                    size_t* decompressed_len, bool* stream_end,
76
96
                                    size_t* more_input_bytes, size_t* more_output_bytes) {
77
96
    if (!_is_header_loaded) {
78
        // this is the first time to call lzo decompress, parse the header info first
79
37
        RETURN_IF_ERROR(parse_header_info(input, input_len, input_bytes_read, more_input_bytes));
80
37
        if (*more_input_bytes > 0) {
81
0
            return Status::OK();
82
0
        }
83
37
    }
84
85
    // read compressed block
86
    // compressed-block ::=
87
    //   <uncompressed-size>
88
    //   <compressed-size>
89
    //   <uncompressed-checksums>
90
    //   <compressed-checksums>
91
    //   <compressed-data>
92
96
    size_t left_input_len = input_len - *input_bytes_read;
93
96
    if (left_input_len < sizeof(uint32_t)) {
94
        // block is at least have uncompressed_size
95
0
        *more_input_bytes = sizeof(uint32_t) - left_input_len;
96
0
        return Status::OK();
97
0
    }
98
99
96
    uint8_t* block_start = input + *input_bytes_read;
100
96
    uint8_t* ptr = block_start;
101
    // 1. uncompressed size
102
96
    uint32_t uncompressed_size;
103
96
    ptr = get_uint32(ptr, &uncompressed_size);
104
96
    left_input_len -= sizeof(uint32_t);
105
96
    if (uncompressed_size == 0) {
106
45
        *input_bytes_read += sizeof(uint32_t);
107
45
        *stream_end = true;
108
45
        return Status::OK();
109
45
    }
110
111
    // 2. compressed size
112
51
    if (left_input_len < sizeof(uint32_t)) {
113
0
        *more_input_bytes = sizeof(uint32_t) - left_input_len;
114
0
        return Status::OK();
115
0
    }
116
117
51
    uint32_t compressed_size;
118
51
    ptr = get_uint32(ptr, &compressed_size);
119
51
    left_input_len -= sizeof(uint32_t);
120
51
    if (compressed_size > LZO_MAX_BLOCK_SIZE) {
121
0
        std::stringstream ss;
122
0
        ss << "lzo block size: " << compressed_size
123
0
           << " is greater than LZO_MAX_BLOCK_SIZE: " << LZO_MAX_BLOCK_SIZE;
124
0
        return Status::InternalError(ss.str());
125
0
    }
126
127
    // 3. out checksum
128
51
    uint32_t out_checksum = 0;
129
51
    if (_header_info.output_checksum_type != CHECK_NONE) {
130
51
        if (left_input_len < sizeof(uint32_t)) {
131
0
            *more_input_bytes = sizeof(uint32_t) - left_input_len;
132
0
            return Status::OK();
133
0
        }
134
135
51
        ptr = get_uint32(ptr, &out_checksum);
136
51
        left_input_len -= sizeof(uint32_t);
137
51
    }
138
139
    // 4. in checksum
140
51
    uint32_t in_checksum = 0;
141
51
    if (compressed_size < uncompressed_size && _header_info.input_checksum_type != CHECK_NONE) {
142
12
        if (left_input_len < sizeof(uint32_t)) {
143
0
            *more_input_bytes = sizeof(uint32_t) - left_input_len;
144
0
            return Status::OK();
145
0
        }
146
147
12
        ptr = get_uint32(ptr, &in_checksum);
148
12
        left_input_len -= sizeof(uint32_t);
149
39
    } else {
150
        // If the compressed data size is equal to the uncompressed data size, then
151
        // the uncompressed data is stored and there is no compressed checksum.
152
39
        in_checksum = out_checksum;
153
39
    }
154
155
    // 5. checksum compressed data
156
51
    if (left_input_len < compressed_size) {
157
0
        *more_input_bytes = compressed_size - left_input_len;
158
0
        return Status::OK();
159
0
    }
160
51
    RETURN_IF_ERROR(checksum(_header_info.input_checksum_type, "compressed", in_checksum, ptr,
161
51
                             compressed_size));
162
163
    // 6. decompress
164
51
    if (output_max_len < uncompressed_size) {
165
0
        *more_output_bytes = uncompressed_size - output_max_len;
166
0
        return Status::OK();
167
0
    }
168
51
    if (compressed_size == uncompressed_size) {
169
        // the data is uncompressed, just copy to the output buf
170
36
        memmove(output, ptr, compressed_size);
171
36
        ptr += compressed_size;
172
36
    } else {
173
15
        try {
174
15
            *decompressed_len =
175
15
                    orc::lzoDecompress((const char*)ptr, (const char*)(ptr + compressed_size),
176
15
                                       (char*)output, (char*)(output + uncompressed_size));
177
15
        } catch (const orc::ParseError& err) {
178
0
            std::stringstream ss;
179
0
            ss << "Lzo decompression failed: " << err.what();
180
0
            return Status::InternalError(ss.str());
181
0
        }
182
183
15
        RETURN_IF_ERROR(checksum(_header_info.output_checksum_type, "decompressed", out_checksum,
184
15
                                 output, *decompressed_len));
185
15
        ptr += compressed_size;
186
15
    }
187
188
    // 7. done
189
51
    *stream_end = true;
190
51
    *decompressed_len = uncompressed_size;
191
51
    *input_bytes_read += ptr - block_start;
192
193
51
    VLOG_DEBUG << "finished decompress lzo block."
194
0
               << " compressed_size: " << compressed_size
195
0
               << " decompressed_len: " << *decompressed_len
196
0
               << " input_bytes_read: " << *input_bytes_read;
197
198
51
    return Status::OK();
199
51
}
200
201
// file-header ::=  -- most of this information is not used.
202
//   <magic>
203
//   <version>
204
//   <lib-version>
205
//   [<version-needed>] -- present for all modern files.
206
//   <method>
207
//   <level>
208
//   <flags>
209
//   <mode>
210
//   <mtime>
211
//   <file-name>
212
//   <header-checksum>
213
//   <extra-field> -- presence indicated in flags, not currently used.
214
Status LzopDecompressor::parse_header_info(uint8_t* input, size_t input_len,
215
37
                                           size_t* input_bytes_read, size_t* more_input_bytes) {
216
37
    if (input_len < MIN_HEADER_SIZE) {
217
0
        VLOG_NOTICE << "highly recommanded that Lzo header size is larger than " << MIN_HEADER_SIZE
218
0
                    << ", or parsing header info may failed."
219
0
                    << " only given: " << input_len;
220
0
        *more_input_bytes = MIN_HEADER_SIZE - input_len;
221
0
        return Status::OK();
222
0
    }
223
224
37
    uint8_t* ptr = input;
225
    // 1. magic
226
37
    if (memcmp(ptr, LZOP_MAGIC, sizeof(LZOP_MAGIC))) {
227
0
        std::stringstream ss;
228
0
        ss << "invalid lzo magic number";
229
0
        return Status::InternalError(ss.str());
230
0
    }
231
37
    ptr += sizeof(LZOP_MAGIC);
232
37
    uint8_t* header = ptr;
233
234
    // 2. version
235
37
    ptr = get_uint16(ptr, &_header_info.version);
236
37
    if (_header_info.version > LZOP_VERSION) {
237
0
        std::stringstream ss;
238
0
        ss << "compressed with later version of lzop: " << &_header_info.version
239
0
           << " must be less than: " << LZOP_VERSION;
240
0
        return Status::InternalError(ss.str());
241
0
    }
242
243
    // 3. lib version
244
37
    ptr = get_uint16(ptr, &_header_info.lib_version);
245
37
    if (_header_info.lib_version < MIN_LZO_VERSION) {
246
0
        std::stringstream ss;
247
0
        ss << "compressed with incompatible lzo version: " << &_header_info.lib_version
248
0
           << "must be at least: " << MIN_LZO_VERSION;
249
0
        return Status::InternalError(ss.str());
250
0
    }
251
252
    // 4. version needed
253
37
    ptr = get_uint16(ptr, &_header_info.version_needed);
254
37
    if (_header_info.version_needed > LZOP_VERSION) {
255
0
        std::stringstream ss;
256
0
        ss << "compressed with imp incompatible lzo version: " << &_header_info.version
257
0
           << " must be at no more than: " << LZOP_VERSION;
258
0
        return Status::InternalError(ss.str());
259
0
    }
260
261
    // 5. method
262
37
    ptr = get_uint8(ptr, &_header_info.method);
263
37
    if (_header_info.method < 1 || _header_info.method > 3) {
264
0
        std::stringstream ss;
265
0
        ss << "invalid compression method: " << _header_info.method;
266
0
        return Status::InternalError(ss.str());
267
0
    }
268
269
    // 6. unsupported level: 7, 8, 9
270
37
    uint8_t level;
271
37
    ptr = get_uint8(ptr, &level);
272
37
    if (level > 6) {
273
0
        std::stringstream ss;
274
0
        ss << "unsupported lzo level: " << level;
275
0
        return Status::InternalError(ss.str());
276
0
    }
277
278
    // 7. flags
279
37
    uint32_t flags;
280
37
    ptr = get_uint32(ptr, &flags);
281
37
    if (flags & (F_RESERVED | F_MULTIPART | F_H_FILTER)) {
282
0
        std::stringstream ss;
283
0
        ss << "unsupported lzo flags: " << flags;
284
0
        return Status::InternalError(ss.str());
285
0
    }
286
37
    _header_info.header_checksum_type = header_type(flags);
287
37
    _header_info.input_checksum_type = input_type(flags);
288
37
    _header_info.output_checksum_type = output_type(flags);
289
290
    // 8. skip mode and mtime
291
37
    ptr += 3 * sizeof(int32_t);
292
293
    // 9. filename
294
37
    uint8_t filename_len;
295
37
    ptr = get_uint8(ptr, &filename_len);
296
297
    // here we already consume (MIN_HEADER_SIZE)
298
    // from now we have to check left input is enough for each step
299
37
    size_t left = input_len - (ptr - input);
300
37
    if (left < filename_len) {
301
0
        *more_input_bytes = filename_len - left;
302
0
        return Status::OK();
303
0
    }
304
305
37
    _header_info.filename = std::string((char*)ptr, (size_t)filename_len);
306
37
    ptr += filename_len;
307
37
    left -= filename_len;
308
309
    // 10. checksum
310
37
    if (left < sizeof(uint32_t)) {
311
0
        *more_input_bytes = sizeof(uint32_t) - left;
312
0
        return Status::OK();
313
0
    }
314
37
    uint32_t expected_checksum;
315
37
    uint8_t* cur = ptr;
316
37
    ptr = get_uint32(ptr, &expected_checksum);
317
37
    uint32_t computed_checksum;
318
37
    if (_header_info.header_checksum_type == CHECK_CRC32) {
319
0
        computed_checksum = CRC32_INIT_VALUE;
320
0
        computed_checksum = crc32c::Extend(computed_checksum, (const uint8_t*)header, cur - header);
321
37
    } else {
322
37
        computed_checksum = ADLER32_INIT_VALUE;
323
37
        computed_checksum = olap_adler32(computed_checksum, (const char*)header, cur - header);
324
37
    }
325
326
37
    if (computed_checksum != expected_checksum) {
327
0
        std::stringstream ss;
328
0
        ss << "invalid header checksum: " << computed_checksum
329
0
           << " expected: " << expected_checksum;
330
0
        return Status::InternalError(ss.str());
331
0
    }
332
37
    left -= sizeof(uint32_t);
333
334
    // 11. skip extra
335
37
    if (flags & F_H_EXTRA_FIELD) {
336
0
        if (left < sizeof(uint32_t)) {
337
0
            *more_input_bytes = sizeof(uint32_t) - left;
338
0
            return Status::OK();
339
0
        }
340
0
        uint32_t extra_len;
341
0
        ptr = get_uint32(ptr, &extra_len);
342
0
        left -= sizeof(uint32_t);
343
344
        // add the checksum and the len to the total ptr size.
345
0
        if (left < sizeof(int32_t) + extra_len) {
346
0
            *more_input_bytes = sizeof(int32_t) + extra_len - left;
347
0
            return Status::OK();
348
0
        }
349
0
        left -= sizeof(int32_t) + extra_len;
350
0
        ptr += sizeof(int32_t) + extra_len;
351
0
    }
352
353
37
    _header_info.header_size = cast_set<int32_t>(ptr - input);
354
37
    *input_bytes_read = _header_info.header_size;
355
356
37
    _is_header_loaded = true;
357
37
    VLOG_DEBUG << debug_info();
358
359
37
    return Status::OK();
360
37
}
361
362
Status LzopDecompressor::checksum(LzoChecksum type, const std::string& source, uint32_t expected,
363
66
                                  uint8_t* ptr, size_t len) {
364
66
    uint32_t computed_checksum;
365
66
    switch (type) {
366
3
    case CHECK_NONE:
367
3
        return Status::OK();
368
0
    case CHECK_CRC32:
369
0
        computed_checksum = crc32c::Extend(CRC32_INIT_VALUE, (const uint8_t*)ptr, len);
370
0
        break;
371
63
    case CHECK_ADLER:
372
63
        computed_checksum = olap_adler32(ADLER32_INIT_VALUE, (const char*)ptr, len);
373
63
        break;
374
0
    default:
375
0
        std::stringstream ss;
376
0
        ss << "Invalid checksum type: " << type;
377
0
        return Status::InternalError(ss.str());
378
66
    }
379
380
63
    if (computed_checksum != expected) {
381
0
        std::stringstream ss;
382
0
        ss << "checksum of " << source << " block failed."
383
0
           << " computed checksum: " << computed_checksum << " expected: " << expected;
384
0
        return Status::InternalError(ss.str());
385
0
    }
386
387
63
    return Status::OK();
388
63
}
389
390
0
std::string LzopDecompressor::debug_info() {
391
0
    std::stringstream ss;
392
0
    ss << "LzopDecompressor."
393
0
       << " version: " << _header_info.version << " lib version: " << _header_info.lib_version
394
0
       << " version needed: " << _header_info.version_needed
395
0
       << " method: " << (uint16_t)_header_info.method << " filename: " << _header_info.filename
396
0
       << " header size: " << _header_info.header_size
397
0
       << " header checksum type: " << _header_info.header_checksum_type
398
0
       << " input checksum type: " << _header_info.input_checksum_type
399
0
       << " output checksum type: " << _header_info.output_checksum_type;
400
0
    return ss.str();
401
0
}
402
403
} // namespace doris