Coverage Report

Created: 2026-04-10 18:35

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/decompressor.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "util/decompressor.h"
19
20
#include <strings.h>
21
22
#include <memory>
23
#include <ostream>
24
25
#include "common/cast_set.h"
26
#include "common/logging.h"
27
#include "common/status.h"
28
#include "exec/common/endian.h"
29
30
namespace doris {
31
32
Status Decompressor::create_decompressor(CompressType type,
33
8.98k
                                         std::unique_ptr<Decompressor>* decompressor) {
34
8.98k
    switch (type) {
35
4.92k
    case CompressType::UNCOMPRESSED:
36
4.92k
        decompressor->reset(nullptr);
37
4.92k
        break;
38
1.02k
    case CompressType::GZIP:
39
1.02k
        decompressor->reset(new GzipDecompressor(false));
40
1.02k
        break;
41
33
    case CompressType::DEFLATE:
42
33
        decompressor->reset(new GzipDecompressor(true));
43
33
        break;
44
700
    case CompressType::BZIP2:
45
700
        decompressor->reset(new Bzip2Decompressor());
46
700
        break;
47
766
    case CompressType::ZSTD:
48
766
        decompressor->reset(new ZstdDecompressor());
49
766
        break;
50
50
    case CompressType::LZ4FRAME:
51
50
        decompressor->reset(new Lz4FrameDecompressor());
52
50
        break;
53
737
    case CompressType::LZ4BLOCK:
54
737
        decompressor->reset(new Lz4BlockDecompressor());
55
737
        break;
56
722
    case CompressType::SNAPPYBLOCK:
57
722
        decompressor->reset(new SnappyBlockDecompressor());
58
722
        break;
59
37
    case CompressType::LZOP:
60
37
        decompressor->reset(new LzopDecompressor());
61
37
        break;
62
0
    default:
63
0
        return Status::InternalError("Unknown compress type: {}", type);
64
8.98k
    }
65
66
8.98k
    Status st = Status::OK();
67
8.98k
    if (*decompressor != nullptr) {
68
4.07k
        st = (*decompressor)->init();
69
4.07k
    }
70
71
8.98k
    return st;
72
8.98k
}
73
74
Status Decompressor::create_decompressor(TFileCompressType::type type,
75
8.91k
                                         std::unique_ptr<Decompressor>* decompressor) {
76
8.91k
    CompressType compress_type;
77
8.91k
    switch (type) {
78
4.83k
    case TFileCompressType::PLAIN:
79
4.83k
    case TFileCompressType::UNKNOWN:
80
4.83k
        compress_type = CompressType::UNCOMPRESSED;
81
4.83k
        break;
82
1.02k
    case TFileCompressType::GZ:
83
1.02k
        compress_type = CompressType::GZIP;
84
1.02k
        break;
85
3
    case TFileCompressType::LZO:
86
37
    case TFileCompressType::LZOP:
87
37
        compress_type = CompressType::LZOP;
88
37
        break;
89
700
    case TFileCompressType::BZ2:
90
700
        compress_type = CompressType::BZIP2;
91
700
        break;
92
766
    case TFileCompressType::ZSTD:
93
766
        compress_type = CompressType::ZSTD;
94
766
        break;
95
50
    case TFileCompressType::LZ4FRAME:
96
50
        compress_type = CompressType::LZ4FRAME;
97
50
        break;
98
734
    case TFileCompressType::LZ4BLOCK:
99
734
        compress_type = CompressType::LZ4BLOCK;
100
734
        break;
101
33
    case TFileCompressType::DEFLATE:
102
33
        compress_type = CompressType::DEFLATE;
103
33
        break;
104
722
    case TFileCompressType::SNAPPYBLOCK:
105
722
        compress_type = CompressType::SNAPPYBLOCK;
106
722
        break;
107
0
    default:
108
0
        return Status::InternalError<false>("unknown compress type: {}", type);
109
8.91k
    }
110
8.90k
    RETURN_IF_ERROR(Decompressor::create_decompressor(compress_type, decompressor));
111
112
8.90k
    return Status::OK();
113
8.90k
}
114
115
Status Decompressor::create_decompressor(TFileFormatType::type type,
116
84
                                         std::unique_ptr<Decompressor>* decompressor) {
117
84
    CompressType compress_type;
118
84
    switch (type) {
119
0
    case TFileFormatType::FORMAT_PROTO:
120
0
        [[fallthrough]];
121
84
    case TFileFormatType::FORMAT_CSV_PLAIN:
122
84
        compress_type = CompressType::UNCOMPRESSED;
123
84
        break;
124
0
    case TFileFormatType::FORMAT_CSV_GZ:
125
0
        compress_type = CompressType::GZIP;
126
0
        break;
127
0
    case TFileFormatType::FORMAT_CSV_BZ2:
128
0
        compress_type = CompressType::BZIP2;
129
0
        break;
130
0
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
131
0
        compress_type = CompressType::LZ4FRAME;
132
0
        break;
133
0
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
134
0
        compress_type = CompressType::LZ4BLOCK;
135
0
        break;
136
0
    case TFileFormatType::FORMAT_CSV_LZOP:
137
0
        compress_type = CompressType::LZOP;
138
0
        break;
139
0
    case TFileFormatType::FORMAT_CSV_DEFLATE:
140
0
        compress_type = CompressType::DEFLATE;
141
0
        break;
142
0
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
143
0
        compress_type = CompressType::SNAPPYBLOCK;
144
0
        break;
145
0
    default:
146
0
        return Status::InternalError<false>("unknown compress type: {}", type);
147
84
    }
148
84
    RETURN_IF_ERROR(Decompressor::create_decompressor(compress_type, decompressor));
149
150
84
    return Status::OK();
151
84
}
152
153
0
uint32_t Decompressor::_read_int32(uint8_t* buf) {
154
0
    return (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
155
0
}
156
157
0
std::string Decompressor::debug_info() {
158
0
    return "Decompressor";
159
0
}
160
161
// Gzip
162
GzipDecompressor::GzipDecompressor(bool is_deflate)
163
1.05k
        : Decompressor(is_deflate ? CompressType::DEFLATE : CompressType::GZIP),
164
1.05k
          _is_deflate(is_deflate) {}
165
166
1.05k
GzipDecompressor::~GzipDecompressor() {
167
1.05k
    (void)inflateEnd(&_z_strm);
168
1.05k
}
169
170
1.05k
Status GzipDecompressor::init() {
171
1.05k
    _z_strm = {};
172
1.05k
    _z_strm.zalloc = Z_NULL;
173
1.05k
    _z_strm.zfree = Z_NULL;
174
1.05k
    _z_strm.opaque = Z_NULL;
175
176
1.05k
    int window_bits = _is_deflate ? WINDOW_BITS : (WINDOW_BITS | DETECT_CODEC);
177
1.05k
    int ret = inflateInit2(&_z_strm, window_bits);
178
1.05k
    if (ret < 0) {
179
0
        return Status::InternalError("Failed to do gzip decompress. status code: {}", ret);
180
0
    }
181
182
1.05k
    return Status::OK();
183
1.05k
}
184
185
Status GzipDecompressor::decompress(uint8_t* input, uint32_t input_len, size_t* input_bytes_read,
186
                                    uint8_t* output, uint32_t output_max_len,
187
                                    size_t* decompressed_len, bool* stream_end,
188
1.75k
                                    size_t* more_input_bytes, size_t* more_output_bytes) {
189
    // 1. set input and output
190
1.75k
    _z_strm.next_in = input;
191
1.75k
    _z_strm.avail_in = input_len;
192
1.75k
    _z_strm.next_out = output;
193
1.75k
    _z_strm.avail_out = output_max_len;
194
195
4.25k
    while (_z_strm.avail_out > 0 && _z_strm.avail_in > 0) {
196
2.49k
        *stream_end = false;
197
        // inflate() performs one or both of the following actions:
198
        //   Decompress more input starting at next_in and update next_in and avail_in
199
        //       accordingly.
200
        //   Provide more output starting at next_out and update next_out and avail_out
201
        //       accordingly.
202
        // inflate() returns Z_OK if some progress has been made (more input processed
203
        // or more output produced)
204
205
2.49k
        int ret = inflate(&_z_strm, Z_NO_FLUSH);
206
2.49k
        *input_bytes_read = input_len - _z_strm.avail_in;
207
2.49k
        *decompressed_len = output_max_len - _z_strm.avail_out;
208
209
2.49k
        VLOG_TRACE << "gzip dec ret: " << ret << " input_bytes_read: " << *input_bytes_read
210
0
                   << " decompressed_len: " << *decompressed_len;
211
212
2.49k
        if (ret == Z_BUF_ERROR) {
213
            // Z_BUF_ERROR indicates that inflate() could not consume more input or
214
            // produce more output. inflate() can be called again with more output space
215
            // or more available input
216
            // ATTN: even if ret == Z_OK, decompressed_len may also be zero
217
0
            return Status::OK();
218
2.49k
        } else if (ret == Z_STREAM_END) {
219
1.77k
            *stream_end = true;
220
            // reset _z_strm to continue decoding a subsequent gzip stream
221
1.77k
            ret = inflateReset(&_z_strm);
222
1.77k
            if (ret != Z_OK) {
223
0
                if (_is_deflate) {
224
0
                    return Status::InternalError("Failed to do deflate decompress. return code: {}",
225
0
                                                 ret);
226
0
                } else {
227
0
                    return Status::InternalError("Failed to do gzip decompress. return code: {}",
228
0
                                                 ret);
229
0
                }
230
0
            }
231
1.77k
        } else if (ret != Z_OK) {
232
0
            if (_is_deflate) {
233
0
                return Status::InternalError("Failed to do deflate decompress. return code: {}",
234
0
                                             ret);
235
0
            } else {
236
0
                return Status::InternalError("Failed to do gzip decompress. return code: {}", ret);
237
0
            }
238
719
        } else {
239
            // here ret must be Z_OK.
240
            // we continue if avail_out and avail_in > 0.
241
            // this means 'inflate' is not done yet.
242
719
        }
243
2.49k
    }
244
245
1.75k
    return Status::OK();
246
1.75k
}
247
248
0
std::string GzipDecompressor::debug_info() {
249
0
    std::stringstream ss;
250
0
    ss << "GzipDecompressor."
251
0
       << " is_deflate: " << _is_deflate;
252
0
    return ss.str();
253
0
}
254
255
// Bzip2
256
702
Bzip2Decompressor::~Bzip2Decompressor() {
257
702
    BZ2_bzDecompressEnd(&_bz_strm);
258
702
}
259
260
702
Status Bzip2Decompressor::init() {
261
702
    bzero(&_bz_strm, sizeof(_bz_strm));
262
702
    int ret = BZ2_bzDecompressInit(&_bz_strm, 0, 0);
263
702
    if (ret != BZ_OK) {
264
0
        return Status::InternalError("Failed to do bz2 decompress. status code: {}", ret);
265
0
    }
266
267
702
    return Status::OK();
268
702
}
269
270
Status Bzip2Decompressor::decompress(uint8_t* input, uint32_t input_len, size_t* input_bytes_read,
271
                                     uint8_t* output, uint32_t output_max_len,
272
                                     size_t* decompressed_len, bool* stream_end,
273
758
                                     size_t* more_input_bytes, size_t* more_output_bytes) {
274
    // 1. set input and output
275
758
    _bz_strm.next_in = reinterpret_cast<char*>(input);
276
758
    _bz_strm.avail_in = input_len;
277
758
    _bz_strm.next_out = reinterpret_cast<char*>(output);
278
758
    _bz_strm.avail_out = output_max_len;
279
280
2.06k
    while (_bz_strm.avail_out > 0 && _bz_strm.avail_in > 0) {
281
1.30k
        *stream_end = false;
282
        // decompress
283
1.30k
        int ret = BZ2_bzDecompress(&_bz_strm);
284
1.30k
        *input_bytes_read = input_len - _bz_strm.avail_in;
285
1.30k
        *decompressed_len = output_max_len - _bz_strm.avail_out;
286
287
1.30k
        if (ret == BZ_DATA_ERROR || ret == BZ_DATA_ERROR_MAGIC) {
288
0
            LOG(INFO) << "input_bytes_read: " << *input_bytes_read
289
0
                      << " decompressed_len: " << *decompressed_len;
290
0
            return Status::InternalError("Failed to do bz2 decompress. status code: {}", ret);
291
1.30k
        } else if (ret == BZ_STREAM_END) {
292
1.24k
            *stream_end = true;
293
1.24k
            ret = BZ2_bzDecompressEnd(&_bz_strm);
294
1.24k
            if (ret != BZ_OK) {
295
0
                return Status::InternalError("Failed to do bz2 decompress. status code: {}", ret);
296
0
            }
297
298
1.24k
            ret = BZ2_bzDecompressInit(&_bz_strm, 0, 0);
299
1.24k
            if (ret != BZ_OK) {
300
0
                return Status::InternalError("Failed to do bz2 decompress. status code: {}", ret);
301
0
            }
302
1.24k
        } else if (ret != BZ_OK) {
303
0
            return Status::InternalError("Failed to bz2 decompress. status code: {}", ret);
304
58
        } else {
305
            // continue
306
58
        }
307
1.30k
    }
308
309
758
    return Status::OK();
310
758
}
311
312
0
std::string Bzip2Decompressor::debug_info() {
313
0
    std::stringstream ss;
314
0
    ss << "Bzip2Decompressor.";
315
0
    return ss.str();
316
0
}
317
318
766
ZstdDecompressor::~ZstdDecompressor() {
319
766
    ZSTD_freeDStream(_zstd_strm);
320
766
}
321
322
766
Status ZstdDecompressor::init() {
323
766
    _zstd_strm = ZSTD_createDStream();
324
766
    if (!_zstd_strm) {
325
0
        std::stringstream ss;
326
0
        return Status::InternalError("ZSTD_dctx creation error");
327
0
    }
328
766
    auto ret = ZSTD_initDStream(_zstd_strm);
329
766
    if (ZSTD_isError(ret)) {
330
0
        return Status::InternalError("ZSTD_initDStream error: {}", ZSTD_getErrorName(ret));
331
0
    }
332
766
    return Status::OK();
333
766
}
334
335
Status ZstdDecompressor::decompress(uint8_t* input, uint32_t input_len, size_t* input_bytes_read,
336
                                    uint8_t* output, uint32_t output_max_len,
337
                                    size_t* decompressed_len, bool* stream_end,
338
1.10k
                                    size_t* more_input_bytes, size_t* more_output_bytes) {
339
    // 1. set input and output
340
1.10k
    ZSTD_inBuffer inputBuffer = {input, input_len, 0};
341
1.10k
    ZSTD_outBuffer outputBuffer = {output, output_max_len, 0};
342
343
    // decompress
344
1.10k
    size_t ret = ZSTD_decompressStream(_zstd_strm, &outputBuffer, &inputBuffer);
345
1.10k
    *input_bytes_read = inputBuffer.pos;
346
1.10k
    *decompressed_len = outputBuffer.pos;
347
348
1.10k
    if (ZSTD_isError(ret)) {
349
0
        return Status::InternalError("Failed to do zstd decompress: {}", ZSTD_getErrorName(ret));
350
0
    }
351
352
1.10k
    *stream_end = ret == 0;
353
1.10k
    return Status::OK();
354
1.10k
}
355
356
0
std::string ZstdDecompressor::debug_info() {
357
0
    std::stringstream ss;
358
0
    ss << "ZstdDecompressor.";
359
0
    return ss.str();
360
0
}
361
362
// Lz4Frame
363
// Lz4 version: 1.7.5
364
// define LZ4F_VERSION = 100
365
const unsigned Lz4FrameDecompressor::DORIS_LZ4F_VERSION = 100;
366
367
50
Lz4FrameDecompressor::~Lz4FrameDecompressor() {
368
50
    LZ4F_freeDecompressionContext(_dctx);
369
50
}
370
371
50
Status Lz4FrameDecompressor::init() {
372
50
    size_t ret = LZ4F_createDecompressionContext(&_dctx, DORIS_LZ4F_VERSION);
373
50
    if (LZ4F_isError(ret)) {
374
0
        std::stringstream ss;
375
0
        ss << "LZ4F_dctx creation error: " << std::string(LZ4F_getErrorName(ret));
376
0
        return Status::InternalError(ss.str());
377
0
    }
378
379
    // init as -1
380
50
    _expect_dec_buf_size = -1;
381
382
50
    return Status::OK();
383
50
}
384
385
Status Lz4FrameDecompressor::decompress(uint8_t* input, uint32_t input_len,
386
                                        size_t* input_bytes_read, uint8_t* output,
387
                                        uint32_t output_max_len, size_t* decompressed_len,
388
                                        bool* stream_end, size_t* more_input_bytes,
389
50
                                        size_t* more_output_bytes) {
390
50
    uint8_t* src = input;
391
50
    size_t remaining_input_size = input_len;
392
50
    size_t ret = 1;
393
50
    *input_bytes_read = 0;
394
395
50
    if (_expect_dec_buf_size == -1) {
396
        // init expected decompress buf size, and check if output_max_len is large enough
397
        // ATTN: _expect_dec_buf_size is uninit, which means this is the first time to call
398
        //       decompress(), so *input* should point to the head of the compressed file,
399
        //       where lz4 header section is there.
400
401
50
        if (input_len < 15) {
402
0
            return Status::InternalError(
403
0
                    "Lz4 header size is between 7 and 15 bytes. "
404
0
                    "but input size is only: {}",
405
0
                    input_len);
406
0
        }
407
408
50
        LZ4F_frameInfo_t info;
409
50
        ret = LZ4F_getFrameInfo(_dctx, &info, (void*)src, &remaining_input_size);
410
50
        if (LZ4F_isError(ret)) {
411
0
            return Status::InternalError("LZ4F_getFrameInfo error: {}",
412
0
                                         std::string(LZ4F_getErrorName(ret)));
413
0
        }
414
415
50
        _expect_dec_buf_size = get_block_size(&info);
416
50
        if (_expect_dec_buf_size == -1) {
417
0
            return Status::InternalError(
418
0
                    "Impossible lz4 block size unless more block sizes are allowed {}",
419
0
                    std::string(LZ4F_getErrorName(ret)));
420
0
        }
421
422
50
        *input_bytes_read = remaining_input_size;
423
424
50
        src += remaining_input_size;
425
50
        remaining_input_size = input_len - remaining_input_size;
426
427
50
        LOG(INFO) << "lz4 block size: " << _expect_dec_buf_size;
428
50
    }
429
430
    // decompress
431
50
    size_t output_len = output_max_len;
432
50
    ret = LZ4F_decompress(_dctx, (void*)output, &output_len, (void*)src, &remaining_input_size,
433
50
                          /* LZ4F_decompressOptions_t */ nullptr);
434
50
    if (LZ4F_isError(ret)) {
435
0
        return Status::InternalError("Decompression error: {}",
436
0
                                     std::string(LZ4F_getErrorName(ret)));
437
0
    }
438
439
    // update
440
50
    *input_bytes_read += remaining_input_size;
441
50
    *decompressed_len = output_len;
442
50
    if (ret == 0) {
443
50
        *stream_end = true;
444
50
    } else {
445
0
        *stream_end = false;
446
0
    }
447
448
50
    return Status::OK();
449
50
}
450
451
0
std::string Lz4FrameDecompressor::debug_info() {
452
0
    std::stringstream ss;
453
0
    ss << "Lz4FrameDecompressor."
454
0
       << " expect dec buf size: " << _expect_dec_buf_size
455
0
       << " Lz4 Frame Version: " << DORIS_LZ4F_VERSION;
456
0
    return ss.str();
457
0
}
458
459
50
size_t Lz4FrameDecompressor::get_block_size(const LZ4F_frameInfo_t* info) {
460
50
    switch (info->blockSizeID) {
461
0
    case LZ4F_default:
462
24
    case LZ4F_max64KB:
463
24
        return 1 << 16;
464
26
    case LZ4F_max256KB:
465
26
        return 1 << 18;
466
0
    case LZ4F_max1MB:
467
0
        return 1 << 20;
468
0
    case LZ4F_max4MB:
469
0
        return 1 << 22;
470
0
    default:
471
        // error
472
0
        return -1;
473
50
    }
474
50
}
475
476
/// Lz4BlockDecompressor
477
737
Status Lz4BlockDecompressor::init() {
478
737
    return Status::OK();
479
737
}
480
481
// Hadoop lz4codec source :
482
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
483
// Example:
484
// OriginData(The original data will be divided into several large data block.) :
485
//      large data block1 | large data block2 | large data block3 | ....
486
// The large data block will be divided into several small data block.
487
// Suppose a large data block is divided into three small blocks:
488
// large data block1:            | small block1 | small block2 | small block3 |
489
// CompressData:   <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]>
490
//
491
// A : original length of the current block of large data block.
492
// sizeof(A) = 4 bytes.
493
// A = length(small block1) + length(small block2) + length(small block3)
494
// Bx : length of  small data block bx.
495
// sizeof(Bx) = 4 bytes.
496
// Bx = length(compress(small blockx))
497
Status Lz4BlockDecompressor::decompress(uint8_t* input, uint32_t input_len,
498
                                        size_t* input_bytes_read, uint8_t* output,
499
                                        uint32_t output_max_len, size_t* decompressed_len,
500
                                        bool* stream_end, size_t* more_input_bytes,
501
4.95k
                                        size_t* more_output_bytes) {
502
4.95k
    auto* input_ptr = input;
503
4.95k
    auto* output_ptr = output;
504
505
6.60k
    while (input_len > 0) {
506
5.49k
        if (input_len < sizeof(uint32_t)) {
507
0
            *more_input_bytes = sizeof(uint32_t) - input_len;
508
0
            break;
509
0
        }
510
511
        //if faild, fall back to large block begin
512
5.49k
        auto* large_block_input_ptr = input_ptr;
513
5.49k
        auto* large_block_output_ptr = output_ptr;
514
515
5.49k
        uint32_t remaining_decompressed_large_block_len = BigEndian::Load32(input_ptr);
516
517
5.49k
        input_ptr += sizeof(uint32_t);
518
5.49k
        input_len -= sizeof(uint32_t);
519
520
5.49k
        auto remaining_output_len = cast_set<uint32_t>(output_max_len - *decompressed_len);
521
522
5.49k
        if (remaining_output_len < remaining_decompressed_large_block_len) {
523
            // Need more output buffer
524
3.83k
            *more_output_bytes = remaining_decompressed_large_block_len - remaining_output_len;
525
3.83k
            input_ptr = large_block_input_ptr;
526
3.83k
            output_ptr = large_block_output_ptr;
527
528
3.83k
            break;
529
3.83k
        }
530
531
1.65k
        std::size_t decompressed_large_block_len = 0;
532
3.38k
        while (remaining_decompressed_large_block_len > 0) {
533
            // Check that input length should not be negative.
534
1.72k
            if (input_len < sizeof(uint32_t)) {
535
0
                *more_input_bytes = sizeof(uint32_t) - input_len;
536
0
                break;
537
0
            }
538
539
            // Read the length of the next lz4 compressed block.
540
1.72k
            uint32_t compressed_small_block_len = BigEndian::Load32(input_ptr);
541
542
1.72k
            input_ptr += sizeof(uint32_t);
543
1.72k
            input_len -= sizeof(uint32_t);
544
545
1.72k
            if (compressed_small_block_len == 0) {
546
0
                continue;
547
0
            }
548
549
1.72k
            if (compressed_small_block_len > input_len) {
550
                // Need more input buffer
551
0
                *more_input_bytes = compressed_small_block_len - input_len;
552
0
                break;
553
0
            }
554
555
            // Decompress this block.
556
1.72k
            auto decompressed_small_block_len = LZ4_decompress_safe(
557
1.72k
                    reinterpret_cast<const char*>(input_ptr), reinterpret_cast<char*>(output_ptr),
558
1.72k
                    compressed_small_block_len, remaining_output_len);
559
1.72k
            if (decompressed_small_block_len < 0) {
560
0
                return Status::InvalidArgument("Failed to do Lz4Block decompress, error = {}",
561
0
                                               LZ4F_getErrorName(decompressed_small_block_len));
562
0
            }
563
1.72k
            input_ptr += compressed_small_block_len;
564
1.72k
            input_len -= compressed_small_block_len;
565
566
1.72k
            output_ptr += decompressed_small_block_len;
567
1.72k
            remaining_decompressed_large_block_len -= decompressed_small_block_len;
568
1.72k
            decompressed_large_block_len += decompressed_small_block_len;
569
1.72k
        };
570
571
1.65k
        if (*more_input_bytes != 0) {
572
            // Need more input buffer
573
0
            input_ptr = large_block_input_ptr;
574
0
            output_ptr = large_block_output_ptr;
575
0
            break;
576
0
        }
577
578
1.65k
        *decompressed_len += decompressed_large_block_len;
579
1.65k
    }
580
4.95k
    *input_bytes_read += (input_ptr - input);
581
    // If no more input and output need, means this is the end of a compressed block
582
4.95k
    *stream_end = (*more_input_bytes == 0 && *more_output_bytes == 0);
583
584
4.95k
    return Status::OK();
585
4.95k
}
586
587
0
std::string Lz4BlockDecompressor::debug_info() {
588
0
    std::stringstream ss;
589
0
    ss << "Lz4BlockDecompressor.";
590
0
    return ss.str();
591
0
}
592
593
/// SnappyBlockDecompressor
594
722
Status SnappyBlockDecompressor::init() {
595
722
    return Status::OK();
596
722
}
597
598
// Hadoop snappycodec source :
599
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
600
// Example:
601
// OriginData(The original data will be divided into several large data block.) :
602
//      large data block1 | large data block2 | large data block3 | ....
603
// The large data block will be divided into several small data block.
604
// Suppose a large data block is divided into three small blocks:
605
// large data block1:            | small block1 | small block2 | small block3 |
606
// CompressData:   <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]>
607
//
608
// A : original length of the current block of large data block.
609
// sizeof(A) = 4 bytes.
610
// A = length(small block1) + length(small block2) + length(small block3)
611
// Bx : length of  small data block bx.
612
// sizeof(Bx) = 4 bytes.
613
// Bx = length(compress(small blockx))
614
Status SnappyBlockDecompressor::decompress(uint8_t* input, uint32_t input_len,
615
                                           size_t* input_bytes_read, uint8_t* output,
616
                                           uint32_t output_max_len, size_t* decompressed_len,
617
                                           bool* stream_end, size_t* more_input_bytes,
618
870
                                           size_t* more_output_bytes) {
619
870
    auto* input_ptr = input;
620
870
    auto* output_ptr = output;
621
622
3.97k
    while (input_len > 0) {
623
3.25k
        if (input_len < sizeof(uint32_t)) {
624
0
            *more_input_bytes = sizeof(uint32_t) - input_len;
625
0
            break;
626
0
        }
627
628
        //if faild, fall back to large block begin
629
3.25k
        auto* large_block_input_ptr = input_ptr;
630
3.25k
        auto* large_block_output_ptr = output_ptr;
631
632
3.25k
        uint32_t remaining_decompressed_large_block_len = BigEndian::Load32(input_ptr);
633
634
3.25k
        input_ptr += sizeof(uint32_t);
635
3.25k
        input_len -= sizeof(uint32_t);
636
637
3.25k
        std::size_t remaining_output_len = output_max_len - *decompressed_len;
638
639
3.25k
        if (remaining_output_len < remaining_decompressed_large_block_len) {
640
            // Need more output buffer
641
96
            *more_output_bytes = remaining_decompressed_large_block_len - remaining_output_len;
642
96
            input_ptr = large_block_input_ptr;
643
96
            output_ptr = large_block_output_ptr;
644
645
96
            break;
646
96
        }
647
648
3.15k
        std::size_t decompressed_large_block_len = 0;
649
8.93k
        while (remaining_decompressed_large_block_len > 0) {
650
            // Check that input length should not be negative.
651
5.82k
            if (input_len < sizeof(uint32_t)) {
652
0
                *more_input_bytes = sizeof(uint32_t) - input_len;
653
0
                break;
654
0
            }
655
656
            // Read the length of the next snappy compressed block.
657
5.82k
            size_t compressed_small_block_len = BigEndian::Load32(input_ptr);
658
659
5.82k
            input_ptr += sizeof(uint32_t);
660
5.82k
            input_len -= sizeof(uint32_t);
661
662
5.82k
            if (compressed_small_block_len == 0) {
663
0
                continue;
664
0
            }
665
666
5.82k
            if (compressed_small_block_len > input_len) {
667
                // Need more input buffer
668
52
                *more_input_bytes = compressed_small_block_len - input_len;
669
52
                break;
670
52
            }
671
672
            // Decompress this block.
673
5.77k
            size_t decompressed_small_block_len;
674
5.77k
            if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input_ptr),
675
5.77k
                                               compressed_small_block_len,
676
5.77k
                                               &decompressed_small_block_len)) {
677
0
                return Status::InternalError("Failed to do snappy decompress.");
678
0
            }
679
5.77k
            if (!snappy::RawUncompress(reinterpret_cast<const char*>(input_ptr),
680
5.77k
                                       compressed_small_block_len,
681
5.77k
                                       reinterpret_cast<char*>(output_ptr))) {
682
0
                return Status::InternalError(
683
0
                        "Failed to do snappy decompress. uncompressed_len: {}, compressed_len: {}",
684
0
                        decompressed_small_block_len, compressed_small_block_len);
685
0
            }
686
5.77k
            input_ptr += compressed_small_block_len;
687
5.77k
            input_len -= compressed_small_block_len;
688
689
5.77k
            output_ptr += decompressed_small_block_len;
690
5.77k
            remaining_decompressed_large_block_len -= decompressed_small_block_len;
691
5.77k
            decompressed_large_block_len += decompressed_small_block_len;
692
5.77k
        };
693
694
3.15k
        if (*more_input_bytes != 0) {
695
            // Need more input buffer
696
52
            input_ptr = large_block_input_ptr;
697
52
            output_ptr = large_block_output_ptr;
698
52
            break;
699
52
        }
700
701
3.10k
        *decompressed_len += decompressed_large_block_len;
702
3.10k
    }
703
870
    *input_bytes_read += (input_ptr - input);
704
    // If no more input and output need, means this is the end of a compressed block
705
870
    *stream_end = (*more_input_bytes == 0 && *more_output_bytes == 0);
706
707
870
    return Status::OK();
708
870
}
709
710
0
std::string SnappyBlockDecompressor::debug_info() {
711
0
    std::stringstream ss;
712
0
    ss << "SnappyBlockDecompressor.";
713
0
    return ss.str();
714
0
}
715
716
} // namespace doris