Coverage Report

Created: 2026-03-16 12:03

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/decompressor.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "util/decompressor.h"
19
20
#include <strings.h>
21
22
#include <memory>
23
#include <ostream>
24
25
#include "common/cast_set.h"
26
#include "common/logging.h"
27
#include "common/status.h"
28
#include "exec/common/endian.h"
29
30
namespace doris {
31
#include "common/compile_check_begin.h"
32
33
Status Decompressor::create_decompressor(CompressType type,
34
9.56k
                                         std::unique_ptr<Decompressor>* decompressor) {
35
9.56k
    switch (type) {
36
5.41k
    case CompressType::UNCOMPRESSED:
37
5.41k
        decompressor->reset(nullptr);
38
5.41k
        break;
39
1.05k
    case CompressType::GZIP:
40
1.05k
        decompressor->reset(new GzipDecompressor(false));
41
1.05k
        break;
42
33
    case CompressType::DEFLATE:
43
33
        decompressor->reset(new GzipDecompressor(true));
44
33
        break;
45
776
    case CompressType::BZIP2:
46
776
        decompressor->reset(new Bzip2Decompressor());
47
776
        break;
48
718
    case CompressType::ZSTD:
49
718
        decompressor->reset(new ZstdDecompressor());
50
718
        break;
51
50
    case CompressType::LZ4FRAME:
52
50
        decompressor->reset(new Lz4FrameDecompressor());
53
50
        break;
54
763
    case CompressType::LZ4BLOCK:
55
763
        decompressor->reset(new Lz4BlockDecompressor());
56
763
        break;
57
714
    case CompressType::SNAPPYBLOCK:
58
714
        decompressor->reset(new SnappyBlockDecompressor());
59
714
        break;
60
37
    case CompressType::LZOP:
61
37
        decompressor->reset(new LzopDecompressor());
62
37
        break;
63
0
    default:
64
0
        return Status::InternalError("Unknown compress type: {}", type);
65
9.56k
    }
66
67
9.56k
    Status st = Status::OK();
68
9.56k
    if (*decompressor != nullptr) {
69
4.14k
        st = (*decompressor)->init();
70
4.14k
    }
71
72
9.56k
    return st;
73
9.56k
}
74
75
Status Decompressor::create_decompressor(TFileCompressType::type type,
76
9.48k
                                         std::unique_ptr<Decompressor>* decompressor) {
77
9.48k
    CompressType compress_type;
78
9.48k
    switch (type) {
79
5.32k
    case TFileCompressType::PLAIN:
80
5.33k
    case TFileCompressType::UNKNOWN:
81
5.33k
        compress_type = CompressType::UNCOMPRESSED;
82
5.33k
        break;
83
1.05k
    case TFileCompressType::GZ:
84
1.05k
        compress_type = CompressType::GZIP;
85
1.05k
        break;
86
3
    case TFileCompressType::LZO:
87
37
    case TFileCompressType::LZOP:
88
37
        compress_type = CompressType::LZOP;
89
37
        break;
90
776
    case TFileCompressType::BZ2:
91
776
        compress_type = CompressType::BZIP2;
92
776
        break;
93
718
    case TFileCompressType::ZSTD:
94
718
        compress_type = CompressType::ZSTD;
95
718
        break;
96
50
    case TFileCompressType::LZ4FRAME:
97
50
        compress_type = CompressType::LZ4FRAME;
98
50
        break;
99
760
    case TFileCompressType::LZ4BLOCK:
100
760
        compress_type = CompressType::LZ4BLOCK;
101
760
        break;
102
33
    case TFileCompressType::DEFLATE:
103
33
        compress_type = CompressType::DEFLATE;
104
33
        break;
105
714
    case TFileCompressType::SNAPPYBLOCK:
106
714
        compress_type = CompressType::SNAPPYBLOCK;
107
714
        break;
108
0
    default:
109
0
        return Status::InternalError<false>("unknown compress type: {}", type);
110
9.48k
    }
111
9.47k
    RETURN_IF_ERROR(Decompressor::create_decompressor(compress_type, decompressor));
112
113
9.47k
    return Status::OK();
114
9.47k
}
115
116
Status Decompressor::create_decompressor(TFileFormatType::type type,
117
86
                                         std::unique_ptr<Decompressor>* decompressor) {
118
86
    CompressType compress_type;
119
86
    switch (type) {
120
0
    case TFileFormatType::FORMAT_PROTO:
121
0
        [[fallthrough]];
122
86
    case TFileFormatType::FORMAT_CSV_PLAIN:
123
86
        compress_type = CompressType::UNCOMPRESSED;
124
86
        break;
125
0
    case TFileFormatType::FORMAT_CSV_GZ:
126
0
        compress_type = CompressType::GZIP;
127
0
        break;
128
0
    case TFileFormatType::FORMAT_CSV_BZ2:
129
0
        compress_type = CompressType::BZIP2;
130
0
        break;
131
0
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
132
0
        compress_type = CompressType::LZ4FRAME;
133
0
        break;
134
0
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
135
0
        compress_type = CompressType::LZ4BLOCK;
136
0
        break;
137
0
    case TFileFormatType::FORMAT_CSV_LZOP:
138
0
        compress_type = CompressType::LZOP;
139
0
        break;
140
0
    case TFileFormatType::FORMAT_CSV_DEFLATE:
141
0
        compress_type = CompressType::DEFLATE;
142
0
        break;
143
0
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
144
0
        compress_type = CompressType::SNAPPYBLOCK;
145
0
        break;
146
0
    default:
147
0
        return Status::InternalError<false>("unknown compress type: {}", type);
148
86
    }
149
86
    RETURN_IF_ERROR(Decompressor::create_decompressor(compress_type, decompressor));
150
151
86
    return Status::OK();
152
86
}
153
154
0
uint32_t Decompressor::_read_int32(uint8_t* buf) {
155
0
    return (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
156
0
}
157
158
0
std::string Decompressor::debug_info() {
159
0
    return "Decompressor";
160
0
}
161
162
// Gzip
163
GzipDecompressor::GzipDecompressor(bool is_deflate)
164
1.08k
        : Decompressor(is_deflate ? CompressType::DEFLATE : CompressType::GZIP),
165
1.08k
          _is_deflate(is_deflate) {}
166
167
1.08k
GzipDecompressor::~GzipDecompressor() {
168
1.08k
    (void)inflateEnd(&_z_strm);
169
1.08k
}
170
171
1.08k
Status GzipDecompressor::init() {
172
1.08k
    _z_strm = {};
173
1.08k
    _z_strm.zalloc = Z_NULL;
174
1.08k
    _z_strm.zfree = Z_NULL;
175
1.08k
    _z_strm.opaque = Z_NULL;
176
177
1.08k
    int window_bits = _is_deflate ? WINDOW_BITS : (WINDOW_BITS | DETECT_CODEC);
178
1.08k
    int ret = inflateInit2(&_z_strm, window_bits);
179
1.08k
    if (ret < 0) {
180
0
        return Status::InternalError("Failed to do gzip decompress. status code: {}", ret);
181
0
    }
182
183
1.08k
    return Status::OK();
184
1.08k
}
185
186
Status GzipDecompressor::decompress(uint8_t* input, uint32_t input_len, size_t* input_bytes_read,
187
                                    uint8_t* output, uint32_t output_max_len,
188
                                    size_t* decompressed_len, bool* stream_end,
189
1.79k
                                    size_t* more_input_bytes, size_t* more_output_bytes) {
190
    // 1. set input and output
191
1.79k
    _z_strm.next_in = input;
192
1.79k
    _z_strm.avail_in = input_len;
193
1.79k
    _z_strm.next_out = output;
194
1.79k
    _z_strm.avail_out = output_max_len;
195
196
5.35k
    while (_z_strm.avail_out > 0 && _z_strm.avail_in > 0) {
197
3.55k
        *stream_end = false;
198
        // inflate() performs one or both of the following actions:
199
        //   Decompress more input starting at next_in and update next_in and avail_in
200
        //       accordingly.
201
        //   Provide more output starting at next_out and update next_out and avail_out
202
        //       accordingly.
203
        // inflate() returns Z_OK if some progress has been made (more input processed
204
        // or more output produced)
205
206
3.55k
        int ret = inflate(&_z_strm, Z_NO_FLUSH);
207
3.55k
        *input_bytes_read = input_len - _z_strm.avail_in;
208
3.55k
        *decompressed_len = output_max_len - _z_strm.avail_out;
209
210
3.55k
        VLOG_TRACE << "gzip dec ret: " << ret << " input_bytes_read: " << *input_bytes_read
211
0
                   << " decompressed_len: " << *decompressed_len;
212
213
3.55k
        if (ret == Z_BUF_ERROR) {
214
            // Z_BUF_ERROR indicates that inflate() could not consume more input or
215
            // produce more output. inflate() can be called again with more output space
216
            // or more available input
217
            // ATTN: even if ret == Z_OK, decompressed_len may also be zero
218
0
            return Status::OK();
219
3.55k
        } else if (ret == Z_STREAM_END) {
220
2.83k
            *stream_end = true;
221
            // reset _z_strm to continue decoding a subsequent gzip stream
222
2.83k
            ret = inflateReset(&_z_strm);
223
2.83k
            if (ret != Z_OK) {
224
0
                if (_is_deflate) {
225
0
                    return Status::InternalError("Failed to do deflate decompress. return code: {}",
226
0
                                                 ret);
227
0
                } else {
228
0
                    return Status::InternalError("Failed to do gzip decompress. return code: {}",
229
0
                                                 ret);
230
0
                }
231
0
            }
232
2.83k
        } else if (ret != Z_OK) {
233
0
            if (_is_deflate) {
234
0
                return Status::InternalError("Failed to do deflate decompress. return code: {}",
235
0
                                             ret);
236
0
            } else {
237
0
                return Status::InternalError("Failed to do gzip decompress. return code: {}", ret);
238
0
            }
239
723
        } else {
240
            // here ret must be Z_OK.
241
            // we continue if avail_out and avail_in > 0.
242
            // this means 'inflate' is not done yet.
243
723
        }
244
3.55k
    }
245
246
1.79k
    return Status::OK();
247
1.79k
}
248
249
0
std::string GzipDecompressor::debug_info() {
250
0
    std::stringstream ss;
251
0
    ss << "GzipDecompressor."
252
0
       << " is_deflate: " << _is_deflate;
253
0
    return ss.str();
254
0
}
255
256
// Bzip2
257
776
Bzip2Decompressor::~Bzip2Decompressor() {
258
776
    BZ2_bzDecompressEnd(&_bz_strm);
259
776
}
260
261
776
Status Bzip2Decompressor::init() {
262
776
    bzero(&_bz_strm, sizeof(_bz_strm));
263
776
    int ret = BZ2_bzDecompressInit(&_bz_strm, 0, 0);
264
776
    if (ret != BZ_OK) {
265
0
        return Status::InternalError("Failed to do bz2 decompress. status code: {}", ret);
266
0
    }
267
268
776
    return Status::OK();
269
776
}
270
271
Status Bzip2Decompressor::decompress(uint8_t* input, uint32_t input_len, size_t* input_bytes_read,
272
                                     uint8_t* output, uint32_t output_max_len,
273
                                     size_t* decompressed_len, bool* stream_end,
274
830
                                     size_t* more_input_bytes, size_t* more_output_bytes) {
275
    // 1. set input and output
276
830
    _bz_strm.next_in = reinterpret_cast<char*>(input);
277
830
    _bz_strm.avail_in = input_len;
278
830
    _bz_strm.next_out = reinterpret_cast<char*>(output);
279
830
    _bz_strm.avail_out = output_max_len;
280
281
3.16k
    while (_bz_strm.avail_out > 0 && _bz_strm.avail_in > 0) {
282
2.33k
        *stream_end = false;
283
        // decompress
284
2.33k
        int ret = BZ2_bzDecompress(&_bz_strm);
285
2.33k
        *input_bytes_read = input_len - _bz_strm.avail_in;
286
2.33k
        *decompressed_len = output_max_len - _bz_strm.avail_out;
287
288
2.33k
        if (ret == BZ_DATA_ERROR || ret == BZ_DATA_ERROR_MAGIC) {
289
0
            LOG(INFO) << "input_bytes_read: " << *input_bytes_read
290
0
                      << " decompressed_len: " << *decompressed_len;
291
0
            return Status::InternalError("Failed to do bz2 decompress. status code: {}", ret);
292
2.33k
        } else if (ret == BZ_STREAM_END) {
293
2.27k
            *stream_end = true;
294
2.27k
            ret = BZ2_bzDecompressEnd(&_bz_strm);
295
2.27k
            if (ret != BZ_OK) {
296
0
                return Status::InternalError("Failed to do bz2 decompress. status code: {}", ret);
297
0
            }
298
299
2.27k
            ret = BZ2_bzDecompressInit(&_bz_strm, 0, 0);
300
2.27k
            if (ret != BZ_OK) {
301
0
                return Status::InternalError("Failed to do bz2 decompress. status code: {}", ret);
302
0
            }
303
2.27k
        } else if (ret != BZ_OK) {
304
0
            return Status::InternalError("Failed to bz2 decompress. status code: {}", ret);
305
58
        } else {
306
            // continue
307
58
        }
308
2.33k
    }
309
310
830
    return Status::OK();
311
830
}
312
313
0
std::string Bzip2Decompressor::debug_info() {
314
0
    std::stringstream ss;
315
0
    ss << "Bzip2Decompressor.";
316
0
    return ss.str();
317
0
}
318
319
718
ZstdDecompressor::~ZstdDecompressor() {
320
718
    ZSTD_freeDStream(_zstd_strm);
321
718
}
322
323
718
Status ZstdDecompressor::init() {
324
718
    _zstd_strm = ZSTD_createDStream();
325
718
    if (!_zstd_strm) {
326
0
        std::stringstream ss;
327
0
        return Status::InternalError("ZSTD_dctx creation error");
328
0
    }
329
718
    auto ret = ZSTD_initDStream(_zstd_strm);
330
718
    if (ZSTD_isError(ret)) {
331
0
        return Status::InternalError("ZSTD_initDStream error: {}", ZSTD_getErrorName(ret));
332
0
    }
333
718
    return Status::OK();
334
718
}
335
336
Status ZstdDecompressor::decompress(uint8_t* input, uint32_t input_len, size_t* input_bytes_read,
337
                                    uint8_t* output, uint32_t output_max_len,
338
                                    size_t* decompressed_len, bool* stream_end,
339
2.15k
                                    size_t* more_input_bytes, size_t* more_output_bytes) {
340
    // 1. set input and output
341
2.15k
    ZSTD_inBuffer inputBuffer = {input, input_len, 0};
342
2.15k
    ZSTD_outBuffer outputBuffer = {output, output_max_len, 0};
343
344
    // decompress
345
2.15k
    size_t ret = ZSTD_decompressStream(_zstd_strm, &outputBuffer, &inputBuffer);
346
2.15k
    *input_bytes_read = inputBuffer.pos;
347
2.15k
    *decompressed_len = outputBuffer.pos;
348
349
2.15k
    if (ZSTD_isError(ret)) {
350
0
        return Status::InternalError("Failed to do zstd decompress: {}", ZSTD_getErrorName(ret));
351
0
    }
352
353
2.15k
    *stream_end = ret == 0;
354
2.15k
    return Status::OK();
355
2.15k
}
356
357
0
std::string ZstdDecompressor::debug_info() {
358
0
    std::stringstream ss;
359
0
    ss << "ZstdDecompressor.";
360
0
    return ss.str();
361
0
}
362
363
// Lz4Frame
364
// Lz4 version: 1.7.5
365
// define LZ4F_VERSION = 100
366
const unsigned Lz4FrameDecompressor::DORIS_LZ4F_VERSION = 100;
367
368
50
Lz4FrameDecompressor::~Lz4FrameDecompressor() {
369
50
    LZ4F_freeDecompressionContext(_dctx);
370
50
}
371
372
50
Status Lz4FrameDecompressor::init() {
373
50
    size_t ret = LZ4F_createDecompressionContext(&_dctx, DORIS_LZ4F_VERSION);
374
50
    if (LZ4F_isError(ret)) {
375
0
        std::stringstream ss;
376
0
        ss << "LZ4F_dctx creation error: " << std::string(LZ4F_getErrorName(ret));
377
0
        return Status::InternalError(ss.str());
378
0
    }
379
380
    // init as -1
381
50
    _expect_dec_buf_size = -1;
382
383
50
    return Status::OK();
384
50
}
385
386
Status Lz4FrameDecompressor::decompress(uint8_t* input, uint32_t input_len,
387
                                        size_t* input_bytes_read, uint8_t* output,
388
                                        uint32_t output_max_len, size_t* decompressed_len,
389
                                        bool* stream_end, size_t* more_input_bytes,
390
50
                                        size_t* more_output_bytes) {
391
50
    uint8_t* src = input;
392
50
    size_t remaining_input_size = input_len;
393
50
    size_t ret = 1;
394
50
    *input_bytes_read = 0;
395
396
50
    if (_expect_dec_buf_size == -1) {
397
        // init expected decompress buf size, and check if output_max_len is large enough
398
        // ATTN: _expect_dec_buf_size is uninit, which means this is the first time to call
399
        //       decompress(), so *input* should point to the head of the compressed file,
400
        //       where lz4 header section is there.
401
402
50
        if (input_len < 15) {
403
0
            return Status::InternalError(
404
0
                    "Lz4 header size is between 7 and 15 bytes. "
405
0
                    "but input size is only: {}",
406
0
                    input_len);
407
0
        }
408
409
50
        LZ4F_frameInfo_t info;
410
50
        ret = LZ4F_getFrameInfo(_dctx, &info, (void*)src, &remaining_input_size);
411
50
        if (LZ4F_isError(ret)) {
412
0
            return Status::InternalError("LZ4F_getFrameInfo error: {}",
413
0
                                         std::string(LZ4F_getErrorName(ret)));
414
0
        }
415
416
50
        _expect_dec_buf_size = get_block_size(&info);
417
50
        if (_expect_dec_buf_size == -1) {
418
0
            return Status::InternalError(
419
0
                    "Impossible lz4 block size unless more block sizes are allowed {}",
420
0
                    std::string(LZ4F_getErrorName(ret)));
421
0
        }
422
423
50
        *input_bytes_read = remaining_input_size;
424
425
50
        src += remaining_input_size;
426
50
        remaining_input_size = input_len - remaining_input_size;
427
428
50
        LOG(INFO) << "lz4 block size: " << _expect_dec_buf_size;
429
50
    }
430
431
    // decompress
432
50
    size_t output_len = output_max_len;
433
50
    ret = LZ4F_decompress(_dctx, (void*)output, &output_len, (void*)src, &remaining_input_size,
434
50
                          /* LZ4F_decompressOptions_t */ nullptr);
435
50
    if (LZ4F_isError(ret)) {
436
0
        return Status::InternalError("Decompression error: {}",
437
0
                                     std::string(LZ4F_getErrorName(ret)));
438
0
    }
439
440
    // update
441
50
    *input_bytes_read += remaining_input_size;
442
50
    *decompressed_len = output_len;
443
50
    if (ret == 0) {
444
50
        *stream_end = true;
445
50
    } else {
446
0
        *stream_end = false;
447
0
    }
448
449
50
    return Status::OK();
450
50
}
451
452
0
std::string Lz4FrameDecompressor::debug_info() {
453
0
    std::stringstream ss;
454
0
    ss << "Lz4FrameDecompressor."
455
0
       << " expect dec buf size: " << _expect_dec_buf_size
456
0
       << " Lz4 Frame Version: " << DORIS_LZ4F_VERSION;
457
0
    return ss.str();
458
0
}
459
460
50
size_t Lz4FrameDecompressor::get_block_size(const LZ4F_frameInfo_t* info) {
461
50
    switch (info->blockSizeID) {
462
0
    case LZ4F_default:
463
24
    case LZ4F_max64KB:
464
24
        return 1 << 16;
465
26
    case LZ4F_max256KB:
466
26
        return 1 << 18;
467
0
    case LZ4F_max1MB:
468
0
        return 1 << 20;
469
0
    case LZ4F_max4MB:
470
0
        return 1 << 22;
471
0
    default:
472
        // error
473
0
        return -1;
474
50
    }
475
50
}
476
477
/// Lz4BlockDecompressor
478
763
Status Lz4BlockDecompressor::init() {
479
763
    return Status::OK();
480
763
}
481
482
// Hadoop lz4codec source :
483
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
484
// Example:
485
// OriginData(The original data will be divided into several large data block.) :
486
//      large data block1 | large data block2 | large data block3 | ....
487
// The large data block will be divided into several small data block.
488
// Suppose a large data block is divided into three small blocks:
489
// large data block1:            | small block1 | small block2 | small block3 |
490
// CompressData:   <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]>
491
//
492
// A : original length of the current block of large data block.
493
// sizeof(A) = 4 bytes.
494
// A = length(small block1) + length(small block2) + length(small block3)
495
// Bx : length of  small data block bx.
496
// sizeof(Bx) = 4 bytes.
497
// Bx = length(compress(small blockx))
498
Status Lz4BlockDecompressor::decompress(uint8_t* input, uint32_t input_len,
499
                                        size_t* input_bytes_read, uint8_t* output,
500
                                        uint32_t output_max_len, size_t* decompressed_len,
501
                                        bool* stream_end, size_t* more_input_bytes,
502
2.73k
                                        size_t* more_output_bytes) {
503
2.73k
    auto* input_ptr = input;
504
2.73k
    auto* output_ptr = output;
505
506
5.17k
    while (input_len > 0) {
507
4.31k
        if (input_len < sizeof(uint32_t)) {
508
0
            *more_input_bytes = sizeof(uint32_t) - input_len;
509
0
            break;
510
0
        }
511
512
        //if faild, fall back to large block begin
513
4.31k
        auto* large_block_input_ptr = input_ptr;
514
4.31k
        auto* large_block_output_ptr = output_ptr;
515
516
4.31k
        uint32_t remaining_decompressed_large_block_len = BigEndian::Load32(input_ptr);
517
518
4.31k
        input_ptr += sizeof(uint32_t);
519
4.31k
        input_len -= sizeof(uint32_t);
520
521
4.31k
        auto remaining_output_len = cast_set<uint32_t>(output_max_len - *decompressed_len);
522
523
4.31k
        if (remaining_output_len < remaining_decompressed_large_block_len) {
524
            // Need more output buffer
525
1.87k
            *more_output_bytes = remaining_decompressed_large_block_len - remaining_output_len;
526
1.87k
            input_ptr = large_block_input_ptr;
527
1.87k
            output_ptr = large_block_output_ptr;
528
529
1.87k
            break;
530
1.87k
        }
531
532
2.44k
        std::size_t decompressed_large_block_len = 0;
533
4.93k
        while (remaining_decompressed_large_block_len > 0) {
534
            // Check that input length should not be negative.
535
2.49k
            if (input_len < sizeof(uint32_t)) {
536
0
                *more_input_bytes = sizeof(uint32_t) - input_len;
537
0
                break;
538
0
            }
539
540
            // Read the length of the next lz4 compressed block.
541
2.49k
            uint32_t compressed_small_block_len = BigEndian::Load32(input_ptr);
542
543
2.49k
            input_ptr += sizeof(uint32_t);
544
2.49k
            input_len -= sizeof(uint32_t);
545
546
2.49k
            if (compressed_small_block_len == 0) {
547
0
                continue;
548
0
            }
549
550
2.49k
            if (compressed_small_block_len > input_len) {
551
                // Need more input buffer
552
0
                *more_input_bytes = compressed_small_block_len - input_len;
553
0
                break;
554
0
            }
555
556
            // Decompress this block.
557
2.49k
            auto decompressed_small_block_len = LZ4_decompress_safe(
558
2.49k
                    reinterpret_cast<const char*>(input_ptr), reinterpret_cast<char*>(output_ptr),
559
2.49k
                    compressed_small_block_len, remaining_output_len);
560
2.49k
            if (decompressed_small_block_len < 0) {
561
0
                return Status::InvalidArgument("Failed to do Lz4Block decompress, error = {}",
562
0
                                               LZ4F_getErrorName(decompressed_small_block_len));
563
0
            }
564
2.49k
            input_ptr += compressed_small_block_len;
565
2.49k
            input_len -= compressed_small_block_len;
566
567
2.49k
            output_ptr += decompressed_small_block_len;
568
2.49k
            remaining_decompressed_large_block_len -= decompressed_small_block_len;
569
2.49k
            decompressed_large_block_len += decompressed_small_block_len;
570
2.49k
        };
571
572
2.44k
        if (*more_input_bytes != 0) {
573
            // Need more input buffer
574
0
            input_ptr = large_block_input_ptr;
575
0
            output_ptr = large_block_output_ptr;
576
0
            break;
577
0
        }
578
579
2.44k
        *decompressed_len += decompressed_large_block_len;
580
2.44k
    }
581
2.73k
    *input_bytes_read += (input_ptr - input);
582
    // If no more input and output need, means this is the end of a compressed block
583
2.73k
    *stream_end = (*more_input_bytes == 0 && *more_output_bytes == 0);
584
585
2.73k
    return Status::OK();
586
2.73k
}
587
588
0
std::string Lz4BlockDecompressor::debug_info() {
589
0
    std::stringstream ss;
590
0
    ss << "Lz4BlockDecompressor.";
591
0
    return ss.str();
592
0
}
593
594
/// SnappyBlockDecompressor
595
714
Status SnappyBlockDecompressor::init() {
596
714
    return Status::OK();
597
714
}
598
599
// Hadoop snappycodec source :
600
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
601
// Example:
602
// OriginData(The original data will be divided into several large data block.) :
603
//      large data block1 | large data block2 | large data block3 | ....
604
// The large data block will be divided into several small data block.
605
// Suppose a large data block is divided into three small blocks:
606
// large data block1:            | small block1 | small block2 | small block3 |
607
// CompressData:   <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]>
608
//
609
// A : original length of the current block of large data block.
610
// sizeof(A) = 4 bytes.
611
// A = length(small block1) + length(small block2) + length(small block3)
612
// Bx : length of  small data block bx.
613
// sizeof(Bx) = 4 bytes.
614
// Bx = length(compress(small blockx))
615
Status SnappyBlockDecompressor::decompress(uint8_t* input, uint32_t input_len,
616
                                           size_t* input_bytes_read, uint8_t* output,
617
                                           uint32_t output_max_len, size_t* decompressed_len,
618
                                           bool* stream_end, size_t* more_input_bytes,
619
858
                                           size_t* more_output_bytes) {
620
858
    auto* input_ptr = input;
621
858
    auto* output_ptr = output;
622
623
4.77k
    while (input_len > 0) {
624
4.06k
        if (input_len < sizeof(uint32_t)) {
625
0
            *more_input_bytes = sizeof(uint32_t) - input_len;
626
0
            break;
627
0
        }
628
629
        //if faild, fall back to large block begin
630
4.06k
        auto* large_block_input_ptr = input_ptr;
631
4.06k
        auto* large_block_output_ptr = output_ptr;
632
633
4.06k
        uint32_t remaining_decompressed_large_block_len = BigEndian::Load32(input_ptr);
634
635
4.06k
        input_ptr += sizeof(uint32_t);
636
4.06k
        input_len -= sizeof(uint32_t);
637
638
4.06k
        std::size_t remaining_output_len = output_max_len - *decompressed_len;
639
640
4.06k
        if (remaining_output_len < remaining_decompressed_large_block_len) {
641
            // Need more output buffer
642
96
            *more_output_bytes = remaining_decompressed_large_block_len - remaining_output_len;
643
96
            input_ptr = large_block_input_ptr;
644
96
            output_ptr = large_block_output_ptr;
645
646
96
            break;
647
96
        }
648
649
3.96k
        std::size_t decompressed_large_block_len = 0;
650
10.5k
        while (remaining_decompressed_large_block_len > 0) {
651
            // Check that input length should not be negative.
652
6.59k
            if (input_len < sizeof(uint32_t)) {
653
0
                *more_input_bytes = sizeof(uint32_t) - input_len;
654
0
                break;
655
0
            }
656
657
            // Read the length of the next snappy compressed block.
658
6.59k
            size_t compressed_small_block_len = BigEndian::Load32(input_ptr);
659
660
6.59k
            input_ptr += sizeof(uint32_t);
661
6.59k
            input_len -= sizeof(uint32_t);
662
663
6.59k
            if (compressed_small_block_len == 0) {
664
0
                continue;
665
0
            }
666
667
6.59k
            if (compressed_small_block_len > input_len) {
668
                // Need more input buffer
669
52
                *more_input_bytes = compressed_small_block_len - input_len;
670
52
                break;
671
52
            }
672
673
            // Decompress this block.
674
6.54k
            size_t decompressed_small_block_len;
675
6.54k
            if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input_ptr),
676
6.54k
                                               compressed_small_block_len,
677
6.54k
                                               &decompressed_small_block_len)) {
678
0
                return Status::InternalError("Failed to do snappy decompress.");
679
0
            }
680
6.54k
            if (!snappy::RawUncompress(reinterpret_cast<const char*>(input_ptr),
681
6.54k
                                       compressed_small_block_len,
682
6.54k
                                       reinterpret_cast<char*>(output_ptr))) {
683
0
                return Status::InternalError(
684
0
                        "Failed to do snappy decompress. uncompressed_len: {}, compressed_len: {}",
685
0
                        decompressed_small_block_len, compressed_small_block_len);
686
0
            }
687
6.54k
            input_ptr += compressed_small_block_len;
688
6.54k
            input_len -= compressed_small_block_len;
689
690
6.54k
            output_ptr += decompressed_small_block_len;
691
6.54k
            remaining_decompressed_large_block_len -= decompressed_small_block_len;
692
6.54k
            decompressed_large_block_len += decompressed_small_block_len;
693
6.54k
        };
694
695
3.96k
        if (*more_input_bytes != 0) {
696
            // Need more input buffer
697
52
            input_ptr = large_block_input_ptr;
698
52
            output_ptr = large_block_output_ptr;
699
52
            break;
700
52
        }
701
702
3.91k
        *decompressed_len += decompressed_large_block_len;
703
3.91k
    }
704
858
    *input_bytes_read += (input_ptr - input);
705
    // If no more input and output need, means this is the end of a compressed block
706
858
    *stream_end = (*more_input_bytes == 0 && *more_output_bytes == 0);
707
708
858
    return Status::OK();
709
858
}
710
711
0
std::string SnappyBlockDecompressor::debug_info() {
712
0
    std::stringstream ss;
713
0
    ss << "SnappyBlockDecompressor.";
714
0
    return ss.str();
715
0
}
716
717
#include "common/compile_check_end.h"
718
} // namespace doris