Coverage Report

Created: 2026-03-12 17:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/lzo_decompressor.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <crc32c/crc32c.h>
19
20
#include "common/cast_set.h"
21
#include "common/logging.h"
22
#include "orc/Exceptions.hh"
23
#include "storage/utils.h"
24
#include "util/decompressor.h"
25
26
namespace orc {
27
/**
28
 * Decompress the bytes in to the output buffer.
29
 * @param inputAddress the start of the input
30
 * @param inputLimit one past the last byte of the input
31
 * @param outputAddress the start of the output buffer
32
 * @param outputLimit one past the last byte of the output buffer
33
 * @result the number of bytes decompressed
34
 */
35
uint64_t lzoDecompress(const char* inputAddress, const char* inputLimit, char* outputAddress,
36
                       char* outputLimit);
37
} // namespace orc
38
39
namespace doris {
40
#include "common/compile_check_begin.h"
41
42
// Lzop
43
const uint8_t LzopDecompressor::LZOP_MAGIC[9] = {0x89, 0x4c, 0x5a, 0x4f, 0x00,
44
                                                 0x0d, 0x0a, 0x1a, 0x0a};
45
46
const uint64_t LzopDecompressor::LZOP_VERSION = 0x1040;
47
const uint64_t LzopDecompressor::MIN_LZO_VERSION = 0x0100;
48
// magic(9) + ver(2) + lib_ver(2) + ver_needed(2) + method(1)
49
// + lvl(1) + flags(4) + mode/mtime(12) + filename_len(1)
50
// without the real file name, extra field and checksum
51
const uint32_t LzopDecompressor::MIN_HEADER_SIZE = 34;
52
const uint32_t LzopDecompressor::LZO_MAX_BLOCK_SIZE = (64 * 1024l * 1024l);
53
54
const uint32_t LzopDecompressor::CRC32_INIT_VALUE = 0;
55
const uint32_t LzopDecompressor::ADLER32_INIT_VALUE = 1;
56
57
const uint64_t LzopDecompressor::F_H_CRC32 = 0x00001000L;
58
const uint64_t LzopDecompressor::F_MASK = 0x00003FFFL;
59
const uint64_t LzopDecompressor::F_OS_MASK = 0xff000000L;
60
const uint64_t LzopDecompressor::F_CS_MASK = 0x00f00000L;
61
const uint64_t LzopDecompressor::F_RESERVED = ((F_MASK | F_OS_MASK | F_CS_MASK) ^ 0xffffffffL);
62
const uint64_t LzopDecompressor::F_MULTIPART = 0x00000400L;
63
const uint64_t LzopDecompressor::F_H_FILTER = 0x00000800L;
64
const uint64_t LzopDecompressor::F_H_EXTRA_FIELD = 0x00000040L;
65
const uint64_t LzopDecompressor::F_CRC32_C = 0x00000200L;
66
const uint64_t LzopDecompressor::F_ADLER32_C = 0x00000002L;
67
const uint64_t LzopDecompressor::F_CRC32_D = 0x00000100L;
68
const uint64_t LzopDecompressor::F_ADLER32_D = 0x00000001L;
69
70
34
Status LzopDecompressor::init() {
71
34
    return Status::OK();
72
34
}
73
74
Status LzopDecompressor::decompress(uint8_t* input, uint32_t input_len, size_t* input_bytes_read,
75
                                    uint8_t* output, uint32_t output_max_len,
76
                                    size_t* decompressed_len, bool* stream_end,
77
90
                                    size_t* more_input_bytes, size_t* more_output_bytes) {
78
90
    if (!_is_header_loaded) {
79
        // this is the first time to call lzo decompress, parse the header info first
80
34
        RETURN_IF_ERROR(parse_header_info(input, input_len, input_bytes_read, more_input_bytes));
81
34
        if (*more_input_bytes > 0) {
82
0
            return Status::OK();
83
0
        }
84
34
    }
85
86
    // read compressed block
87
    // compressed-block ::=
88
    //   <uncompressed-size>
89
    //   <compressed-size>
90
    //   <uncompressed-checksums>
91
    //   <compressed-checksums>
92
    //   <compressed-data>
93
90
    size_t left_input_len = input_len - *input_bytes_read;
94
90
    if (left_input_len < sizeof(uint32_t)) {
95
        // block is at least have uncompressed_size
96
0
        *more_input_bytes = sizeof(uint32_t) - left_input_len;
97
0
        return Status::OK();
98
0
    }
99
100
90
    uint8_t* block_start = input + *input_bytes_read;
101
90
    uint8_t* ptr = block_start;
102
    // 1. uncompressed size
103
90
    uint32_t uncompressed_size;
104
90
    ptr = get_uint32(ptr, &uncompressed_size);
105
90
    left_input_len -= sizeof(uint32_t);
106
90
    if (uncompressed_size == 0) {
107
42
        *input_bytes_read += sizeof(uint32_t);
108
42
        *stream_end = true;
109
42
        return Status::OK();
110
42
    }
111
112
    // 2. compressed size
113
48
    if (left_input_len < sizeof(uint32_t)) {
114
0
        *more_input_bytes = sizeof(uint32_t) - left_input_len;
115
0
        return Status::OK();
116
0
    }
117
118
48
    uint32_t compressed_size;
119
48
    ptr = get_uint32(ptr, &compressed_size);
120
48
    left_input_len -= sizeof(uint32_t);
121
48
    if (compressed_size > LZO_MAX_BLOCK_SIZE) {
122
0
        std::stringstream ss;
123
0
        ss << "lzo block size: " << compressed_size
124
0
           << " is greater than LZO_MAX_BLOCK_SIZE: " << LZO_MAX_BLOCK_SIZE;
125
0
        return Status::InternalError(ss.str());
126
0
    }
127
128
    // 3. out checksum
129
48
    uint32_t out_checksum = 0;
130
48
    if (_header_info.output_checksum_type != CHECK_NONE) {
131
48
        if (left_input_len < sizeof(uint32_t)) {
132
0
            *more_input_bytes = sizeof(uint32_t) - left_input_len;
133
0
            return Status::OK();
134
0
        }
135
136
48
        ptr = get_uint32(ptr, &out_checksum);
137
48
        left_input_len -= sizeof(uint32_t);
138
48
    }
139
140
    // 4. in checksum
141
48
    uint32_t in_checksum = 0;
142
48
    if (compressed_size < uncompressed_size && _header_info.input_checksum_type != CHECK_NONE) {
143
12
        if (left_input_len < sizeof(uint32_t)) {
144
0
            *more_input_bytes = sizeof(uint32_t) - left_input_len;
145
0
            return Status::OK();
146
0
        }
147
148
12
        ptr = get_uint32(ptr, &in_checksum);
149
12
        left_input_len -= sizeof(uint32_t);
150
36
    } else {
151
        // If the compressed data size is equal to the uncompressed data size, then
152
        // the uncompressed data is stored and there is no compressed checksum.
153
36
        in_checksum = out_checksum;
154
36
    }
155
156
    // 5. checksum compressed data
157
48
    if (left_input_len < compressed_size) {
158
0
        *more_input_bytes = compressed_size - left_input_len;
159
0
        return Status::OK();
160
0
    }
161
48
    RETURN_IF_ERROR(checksum(_header_info.input_checksum_type, "compressed", in_checksum, ptr,
162
48
                             compressed_size));
163
164
    // 6. decompress
165
48
    if (output_max_len < uncompressed_size) {
166
0
        *more_output_bytes = uncompressed_size - output_max_len;
167
0
        return Status::OK();
168
0
    }
169
48
    if (compressed_size == uncompressed_size) {
170
        // the data is uncompressed, just copy to the output buf
171
36
        memmove(output, ptr, compressed_size);
172
36
        ptr += compressed_size;
173
36
    } else {
174
12
        try {
175
12
            *decompressed_len =
176
12
                    orc::lzoDecompress((const char*)ptr, (const char*)(ptr + compressed_size),
177
12
                                       (char*)output, (char*)(output + uncompressed_size));
178
12
        } catch (const orc::ParseError& err) {
179
0
            std::stringstream ss;
180
0
            ss << "Lzo decompression failed: " << err.what();
181
0
            return Status::InternalError(ss.str());
182
0
        }
183
184
12
        RETURN_IF_ERROR(checksum(_header_info.output_checksum_type, "decompressed", out_checksum,
185
12
                                 output, *decompressed_len));
186
12
        ptr += compressed_size;
187
12
    }
188
189
    // 7. done
190
48
    *stream_end = true;
191
48
    *decompressed_len = uncompressed_size;
192
48
    *input_bytes_read += ptr - block_start;
193
194
48
    VLOG_DEBUG << "finished decompress lzo block."
195
0
               << " compressed_size: " << compressed_size
196
0
               << " decompressed_len: " << *decompressed_len
197
0
               << " input_bytes_read: " << *input_bytes_read;
198
199
48
    return Status::OK();
200
48
}
201
202
// file-header ::=  -- most of this information is not used.
203
//   <magic>
204
//   <version>
205
//   <lib-version>
206
//   [<version-needed>] -- present for all modern files.
207
//   <method>
208
//   <level>
209
//   <flags>
210
//   <mode>
211
//   <mtime>
212
//   <file-name>
213
//   <header-checksum>
214
//   <extra-field> -- presence indicated in flags, not currently used.
215
Status LzopDecompressor::parse_header_info(uint8_t* input, size_t input_len,
216
34
                                           size_t* input_bytes_read, size_t* more_input_bytes) {
217
34
    if (input_len < MIN_HEADER_SIZE) {
218
0
        VLOG_NOTICE << "highly recommanded that Lzo header size is larger than " << MIN_HEADER_SIZE
219
0
                    << ", or parsing header info may failed."
220
0
                    << " only given: " << input_len;
221
0
        *more_input_bytes = MIN_HEADER_SIZE - input_len;
222
0
        return Status::OK();
223
0
    }
224
225
34
    uint8_t* ptr = input;
226
    // 1. magic
227
34
    if (memcmp(ptr, LZOP_MAGIC, sizeof(LZOP_MAGIC))) {
228
0
        std::stringstream ss;
229
0
        ss << "invalid lzo magic number";
230
0
        return Status::InternalError(ss.str());
231
0
    }
232
34
    ptr += sizeof(LZOP_MAGIC);
233
34
    uint8_t* header = ptr;
234
235
    // 2. version
236
34
    ptr = get_uint16(ptr, &_header_info.version);
237
34
    if (_header_info.version > LZOP_VERSION) {
238
0
        std::stringstream ss;
239
0
        ss << "compressed with later version of lzop: " << &_header_info.version
240
0
           << " must be less than: " << LZOP_VERSION;
241
0
        return Status::InternalError(ss.str());
242
0
    }
243
244
    // 3. lib version
245
34
    ptr = get_uint16(ptr, &_header_info.lib_version);
246
34
    if (_header_info.lib_version < MIN_LZO_VERSION) {
247
0
        std::stringstream ss;
248
0
        ss << "compressed with incompatible lzo version: " << &_header_info.lib_version
249
0
           << "must be at least: " << MIN_LZO_VERSION;
250
0
        return Status::InternalError(ss.str());
251
0
    }
252
253
    // 4. version needed
254
34
    ptr = get_uint16(ptr, &_header_info.version_needed);
255
34
    if (_header_info.version_needed > LZOP_VERSION) {
256
0
        std::stringstream ss;
257
0
        ss << "compressed with imp incompatible lzo version: " << &_header_info.version
258
0
           << " must be at no more than: " << LZOP_VERSION;
259
0
        return Status::InternalError(ss.str());
260
0
    }
261
262
    // 5. method
263
34
    ptr = get_uint8(ptr, &_header_info.method);
264
34
    if (_header_info.method < 1 || _header_info.method > 3) {
265
0
        std::stringstream ss;
266
0
        ss << "invalid compression method: " << _header_info.method;
267
0
        return Status::InternalError(ss.str());
268
0
    }
269
270
    // 6. unsupported level: 7, 8, 9
271
34
    uint8_t level;
272
34
    ptr = get_uint8(ptr, &level);
273
34
    if (level > 6) {
274
0
        std::stringstream ss;
275
0
        ss << "unsupported lzo level: " << level;
276
0
        return Status::InternalError(ss.str());
277
0
    }
278
279
    // 7. flags
280
34
    uint32_t flags;
281
34
    ptr = get_uint32(ptr, &flags);
282
34
    if (flags & (F_RESERVED | F_MULTIPART | F_H_FILTER)) {
283
0
        std::stringstream ss;
284
0
        ss << "unsupported lzo flags: " << flags;
285
0
        return Status::InternalError(ss.str());
286
0
    }
287
34
    _header_info.header_checksum_type = header_type(flags);
288
34
    _header_info.input_checksum_type = input_type(flags);
289
34
    _header_info.output_checksum_type = output_type(flags);
290
291
    // 8. skip mode and mtime
292
34
    ptr += 3 * sizeof(int32_t);
293
294
    // 9. filename
295
34
    uint8_t filename_len;
296
34
    ptr = get_uint8(ptr, &filename_len);
297
298
    // here we already consume (MIN_HEADER_SIZE)
299
    // from now we have to check left input is enough for each step
300
34
    size_t left = input_len - (ptr - input);
301
34
    if (left < filename_len) {
302
0
        *more_input_bytes = filename_len - left;
303
0
        return Status::OK();
304
0
    }
305
306
34
    _header_info.filename = std::string((char*)ptr, (size_t)filename_len);
307
34
    ptr += filename_len;
308
34
    left -= filename_len;
309
310
    // 10. checksum
311
34
    if (left < sizeof(uint32_t)) {
312
0
        *more_input_bytes = sizeof(uint32_t) - left;
313
0
        return Status::OK();
314
0
    }
315
34
    uint32_t expected_checksum;
316
34
    uint8_t* cur = ptr;
317
34
    ptr = get_uint32(ptr, &expected_checksum);
318
34
    uint32_t computed_checksum;
319
34
    if (_header_info.header_checksum_type == CHECK_CRC32) {
320
0
        computed_checksum = CRC32_INIT_VALUE;
321
0
        computed_checksum = crc32c::Extend(computed_checksum, (const uint8_t*)header, cur - header);
322
34
    } else {
323
34
        computed_checksum = ADLER32_INIT_VALUE;
324
34
        computed_checksum = olap_adler32(computed_checksum, (const char*)header, cur - header);
325
34
    }
326
327
34
    if (computed_checksum != expected_checksum) {
328
0
        std::stringstream ss;
329
0
        ss << "invalid header checksum: " << computed_checksum
330
0
           << " expected: " << expected_checksum;
331
0
        return Status::InternalError(ss.str());
332
0
    }
333
34
    left -= sizeof(uint32_t);
334
335
    // 11. skip extra
336
34
    if (flags & F_H_EXTRA_FIELD) {
337
0
        if (left < sizeof(uint32_t)) {
338
0
            *more_input_bytes = sizeof(uint32_t) - left;
339
0
            return Status::OK();
340
0
        }
341
0
        uint32_t extra_len;
342
0
        ptr = get_uint32(ptr, &extra_len);
343
0
        left -= sizeof(uint32_t);
344
345
        // add the checksum and the len to the total ptr size.
346
0
        if (left < sizeof(int32_t) + extra_len) {
347
0
            *more_input_bytes = sizeof(int32_t) + extra_len - left;
348
0
            return Status::OK();
349
0
        }
350
0
        left -= sizeof(int32_t) + extra_len;
351
0
        ptr += sizeof(int32_t) + extra_len;
352
0
    }
353
354
34
    _header_info.header_size = cast_set<int32_t>(ptr - input);
355
34
    *input_bytes_read = _header_info.header_size;
356
357
34
    _is_header_loaded = true;
358
34
    VLOG_DEBUG << debug_info();
359
360
34
    return Status::OK();
361
34
}
362
363
Status LzopDecompressor::checksum(LzoChecksum type, const std::string& source, uint32_t expected,
364
60
                                  uint8_t* ptr, size_t len) {
365
60
    uint32_t computed_checksum;
366
60
    switch (type) {
367
0
    case CHECK_NONE:
368
0
        return Status::OK();
369
0
    case CHECK_CRC32:
370
0
        computed_checksum = crc32c::Extend(CRC32_INIT_VALUE, (const uint8_t*)ptr, len);
371
0
        break;
372
60
    case CHECK_ADLER:
373
60
        computed_checksum = olap_adler32(ADLER32_INIT_VALUE, (const char*)ptr, len);
374
60
        break;
375
0
    default:
376
0
        std::stringstream ss;
377
0
        ss << "Invalid checksum type: " << type;
378
0
        return Status::InternalError(ss.str());
379
60
    }
380
381
60
    if (computed_checksum != expected) {
382
0
        std::stringstream ss;
383
0
        ss << "checksum of " << source << " block failed."
384
0
           << " computed checksum: " << computed_checksum << " expected: " << expected;
385
0
        return Status::InternalError(ss.str());
386
0
    }
387
388
60
    return Status::OK();
389
60
}
390
391
0
std::string LzopDecompressor::debug_info() {
392
0
    std::stringstream ss;
393
0
    ss << "LzopDecompressor."
394
0
       << " version: " << _header_info.version << " lib version: " << _header_info.lib_version
395
0
       << " version needed: " << _header_info.version_needed
396
0
       << " method: " << (uint16_t)_header_info.method << " filename: " << _header_info.filename
397
0
       << " header size: " << _header_info.header_size
398
0
       << " header checksum type: " << _header_info.header_checksum_type
399
0
       << " input checksum type: " << _header_info.input_checksum_type
400
0
       << " output checksum type: " << _header_info.output_checksum_type;
401
0
    return ss.str();
402
0
}
403
404
#include "common/compile_check_end.h"
405
} // namespace doris