Coverage Report

Created: 2025-04-27 17:16

/root/doris/be/src/exec/decompressor.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/decompressor.h"
19
20
#include <strings.h>
21
22
#include <memory>
23
#include <ostream>
24
25
#include "common/logging.h"
26
#include "common/status.h"
27
#include "gutil/endian.h"
28
29
namespace doris {
30
31
Status Decompressor::create_decompressor(CompressType type,
32
0
                                         std::unique_ptr<Decompressor>* decompressor) {
33
0
    switch (type) {
34
0
    case CompressType::UNCOMPRESSED:
35
0
        decompressor->reset(nullptr);
36
0
        break;
37
0
    case CompressType::GZIP:
38
0
        decompressor->reset(new GzipDecompressor(false));
39
0
        break;
40
0
    case CompressType::DEFLATE:
41
0
        decompressor->reset(new GzipDecompressor(true));
42
0
        break;
43
0
    case CompressType::BZIP2:
44
0
        decompressor->reset(new Bzip2Decompressor());
45
0
        break;
46
0
    case CompressType::ZSTD:
47
0
        decompressor->reset(new ZstdDecompressor());
48
0
        break;
49
0
    case CompressType::LZ4FRAME:
50
0
        decompressor->reset(new Lz4FrameDecompressor());
51
0
        break;
52
0
    case CompressType::LZ4BLOCK:
53
0
        decompressor->reset(new Lz4BlockDecompressor());
54
0
        break;
55
0
    case CompressType::SNAPPYBLOCK:
56
0
        decompressor->reset(new SnappyBlockDecompressor());
57
0
        break;
58
0
    case CompressType::LZOP:
59
0
        decompressor->reset(new LzopDecompressor());
60
0
        break;
61
0
    default:
62
0
        return Status::InternalError("Unknown compress type: {}", type);
63
0
    }
64
65
0
    Status st = Status::OK();
66
0
    if (*decompressor != nullptr) {
67
0
        st = (*decompressor)->init();
68
0
    }
69
70
0
    return st;
71
0
}
72
73
Status Decompressor::create_decompressor(TFileCompressType::type type,
74
0
                                         std::unique_ptr<Decompressor>* decompressor) {
75
0
    CompressType compress_type;
76
0
    switch (type) {
77
0
    case TFileCompressType::PLAIN:
78
0
    case TFileCompressType::UNKNOWN:
79
0
        compress_type = CompressType::UNCOMPRESSED;
80
0
        break;
81
0
    case TFileCompressType::GZ:
82
0
        compress_type = CompressType::GZIP;
83
0
        break;
84
0
    case TFileCompressType::LZO:
85
0
    case TFileCompressType::LZOP:
86
0
        compress_type = CompressType::LZOP;
87
0
        break;
88
0
    case TFileCompressType::BZ2:
89
0
        compress_type = CompressType::BZIP2;
90
0
        break;
91
0
    case TFileCompressType::ZSTD:
92
0
        compress_type = CompressType::ZSTD;
93
0
        break;
94
0
    case TFileCompressType::LZ4FRAME:
95
0
        compress_type = CompressType::LZ4FRAME;
96
0
        break;
97
0
    case TFileCompressType::LZ4BLOCK:
98
0
        compress_type = CompressType::LZ4BLOCK;
99
0
        break;
100
0
    case TFileCompressType::DEFLATE:
101
0
        compress_type = CompressType::DEFLATE;
102
0
        break;
103
0
    case TFileCompressType::SNAPPYBLOCK:
104
0
        compress_type = CompressType::SNAPPYBLOCK;
105
0
        break;
106
0
    default:
107
0
        return Status::InternalError<false>("unknown compress type: {}", type);
108
0
    }
109
0
    RETURN_IF_ERROR(Decompressor::create_decompressor(compress_type, decompressor));
110
111
0
    return Status::OK();
112
0
}
113
114
Status Decompressor::create_decompressor(TFileFormatType::type type,
115
0
                                         std::unique_ptr<Decompressor>* decompressor) {
116
0
    CompressType compress_type;
117
0
    switch (type) {
118
0
    case TFileFormatType::FORMAT_PROTO:
119
0
        [[fallthrough]];
120
0
    case TFileFormatType::FORMAT_CSV_PLAIN:
121
0
        compress_type = CompressType::UNCOMPRESSED;
122
0
        break;
123
0
    case TFileFormatType::FORMAT_CSV_GZ:
124
0
        compress_type = CompressType::GZIP;
125
0
        break;
126
0
    case TFileFormatType::FORMAT_CSV_BZ2:
127
0
        compress_type = CompressType::BZIP2;
128
0
        break;
129
0
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
130
0
        compress_type = CompressType::LZ4FRAME;
131
0
        break;
132
0
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
133
0
        compress_type = CompressType::LZ4BLOCK;
134
0
        break;
135
0
    case TFileFormatType::FORMAT_CSV_LZOP:
136
0
        compress_type = CompressType::LZOP;
137
0
        break;
138
0
    case TFileFormatType::FORMAT_CSV_DEFLATE:
139
0
        compress_type = CompressType::DEFLATE;
140
0
        break;
141
0
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
142
0
        compress_type = CompressType::SNAPPYBLOCK;
143
0
        break;
144
0
    default:
145
0
        return Status::InternalError<false>("unknown compress type: {}", type);
146
0
    }
147
0
    RETURN_IF_ERROR(Decompressor::create_decompressor(compress_type, decompressor));
148
149
0
    return Status::OK();
150
0
}
151
152
0
uint32_t Decompressor::_read_int32(uint8_t* buf) {
153
0
    return (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
154
0
}
155
156
0
std::string Decompressor::debug_info() {
157
0
    return "Decompressor";
158
0
}
159
160
// Gzip
161
GzipDecompressor::GzipDecompressor(bool is_deflate)
162
        : Decompressor(is_deflate ? CompressType::DEFLATE : CompressType::GZIP),
163
0
          _is_deflate(is_deflate) {}
164
165
0
GzipDecompressor::~GzipDecompressor() {
166
0
    (void)inflateEnd(&_z_strm);
167
0
}
168
169
0
Status GzipDecompressor::init() {
170
0
    _z_strm = {};
171
0
    _z_strm.zalloc = Z_NULL;
172
0
    _z_strm.zfree = Z_NULL;
173
0
    _z_strm.opaque = Z_NULL;
174
175
0
    int window_bits = _is_deflate ? WINDOW_BITS : (WINDOW_BITS | DETECT_CODEC);
176
0
    int ret = inflateInit2(&_z_strm, window_bits);
177
0
    if (ret < 0) {
178
0
        return Status::InternalError("Failed to init inflate. status code: {}", ret);
179
0
    }
180
181
0
    return Status::OK();
182
0
}
183
184
Status GzipDecompressor::decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read,
185
                                    uint8_t* output, size_t output_max_len,
186
                                    size_t* decompressed_len, bool* stream_end,
187
0
                                    size_t* more_input_bytes, size_t* more_output_bytes) {
188
    // 1. set input and output
189
0
    _z_strm.next_in = input;
190
0
    _z_strm.avail_in = input_len;
191
0
    _z_strm.next_out = output;
192
0
    _z_strm.avail_out = output_max_len;
193
194
0
    while (_z_strm.avail_out > 0 && _z_strm.avail_in > 0) {
195
0
        *stream_end = false;
196
        // inflate() performs one or both of the following actions:
197
        //   Decompress more input starting at next_in and update next_in and avail_in
198
        //       accordingly.
199
        //   Provide more output starting at next_out and update next_out and avail_out
200
        //       accordingly.
201
        // inflate() returns Z_OK if some progress has been made (more input processed
202
        // or more output produced)
203
204
0
        int ret = inflate(&_z_strm, Z_NO_FLUSH);
205
0
        *input_bytes_read = input_len - _z_strm.avail_in;
206
0
        *decompressed_len = output_max_len - _z_strm.avail_out;
207
208
0
        VLOG_TRACE << "gzip dec ret: " << ret << " input_bytes_read: " << *input_bytes_read
209
0
                   << " decompressed_len: " << *decompressed_len;
210
211
0
        if (ret == Z_BUF_ERROR) {
212
            // Z_BUF_ERROR indicates that inflate() could not consume more input or
213
            // produce more output. inflate() can be called again with more output space
214
            // or more available input
215
            // ATTN: even if ret == Z_OK, decompressed_len may also be zero
216
0
            return Status::OK();
217
0
        } else if (ret == Z_STREAM_END) {
218
0
            *stream_end = true;
219
            // reset _z_strm to continue decoding a subsequent gzip stream
220
0
            ret = inflateReset(&_z_strm);
221
0
            if (ret != Z_OK) {
222
0
                return Status::InternalError("Failed to inflateReset. return code: {}", ret);
223
0
            }
224
0
        } else if (ret != Z_OK) {
225
0
            return Status::InternalError("Failed to inflate. return code: {}", ret);
226
0
        } else {
227
            // here ret must be Z_OK.
228
            // we continue if avail_out and avail_in > 0.
229
            // this means 'inflate' is not done yet.
230
0
        }
231
0
    }
232
233
0
    return Status::OK();
234
0
}
235
236
0
std::string GzipDecompressor::debug_info() {
237
0
    std::stringstream ss;
238
0
    ss << "GzipDecompressor."
239
0
       << " is_deflate: " << _is_deflate;
240
0
    return ss.str();
241
0
}
242
243
// Bzip2
244
0
Bzip2Decompressor::~Bzip2Decompressor() {
245
0
    BZ2_bzDecompressEnd(&_bz_strm);
246
0
}
247
248
0
Status Bzip2Decompressor::init() {
249
0
    bzero(&_bz_strm, sizeof(_bz_strm));
250
0
    int ret = BZ2_bzDecompressInit(&_bz_strm, 0, 0);
251
0
    if (ret != BZ_OK) {
252
0
        return Status::InternalError("Failed to init bz2. status code: {}", ret);
253
0
    }
254
255
0
    return Status::OK();
256
0
}
257
258
Status Bzip2Decompressor::decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read,
259
                                     uint8_t* output, size_t output_max_len,
260
                                     size_t* decompressed_len, bool* stream_end,
261
0
                                     size_t* more_input_bytes, size_t* more_output_bytes) {
262
    // 1. set input and output
263
0
    _bz_strm.next_in = const_cast<char*>(reinterpret_cast<const char*>(input));
264
0
    _bz_strm.avail_in = input_len;
265
0
    _bz_strm.next_out = reinterpret_cast<char*>(output);
266
0
    _bz_strm.avail_out = output_max_len;
267
268
0
    while (_bz_strm.avail_out > 0 && _bz_strm.avail_in > 0) {
269
0
        *stream_end = false;
270
        // decompress
271
0
        int ret = BZ2_bzDecompress(&_bz_strm);
272
0
        *input_bytes_read = input_len - _bz_strm.avail_in;
273
0
        *decompressed_len = output_max_len - _bz_strm.avail_out;
274
275
0
        if (ret == BZ_DATA_ERROR || ret == BZ_DATA_ERROR_MAGIC) {
276
0
            LOG(INFO) << "input_bytes_read: " << *input_bytes_read
277
0
                      << " decompressed_len: " << *decompressed_len;
278
0
            return Status::InternalError("Failed to bz2 decompress. status code: {}", ret);
279
0
        } else if (ret == BZ_STREAM_END) {
280
0
            *stream_end = true;
281
0
            ret = BZ2_bzDecompressEnd(&_bz_strm);
282
0
            if (ret != BZ_OK) {
283
0
                return Status::InternalError(
284
0
                        "Failed to end bz2 after meet BZ_STREAM_END. status code: {}", ret);
285
0
            }
286
287
0
            ret = BZ2_bzDecompressInit(&_bz_strm, 0, 0);
288
0
            if (ret != BZ_OK) {
289
0
                return Status::InternalError(
290
0
                        "Failed to init bz2 after meet BZ_STREAM_END. status code: {}", ret);
291
0
            }
292
0
        } else if (ret != BZ_OK) {
293
0
            return Status::InternalError("Failed to bz2 decompress. status code: {}", ret);
294
0
        } else {
295
            // continue
296
0
        }
297
0
    }
298
299
0
    return Status::OK();
300
0
}
301
302
0
std::string Bzip2Decompressor::debug_info() {
303
0
    std::stringstream ss;
304
0
    ss << "Bzip2Decompressor.";
305
0
    return ss.str();
306
0
}
307
308
0
ZstdDecompressor::~ZstdDecompressor() {
309
0
    ZSTD_freeDStream(_zstd_strm);
310
0
}
311
312
0
Status ZstdDecompressor::init() {
313
0
    _zstd_strm = ZSTD_createDStream();
314
0
    if (!_zstd_strm) {
315
0
        std::stringstream ss;
316
0
        return Status::InternalError("ZSTD_dctx creation error");
317
0
    }
318
0
    auto ret = ZSTD_initDStream(_zstd_strm);
319
0
    if (ZSTD_isError(ret)) {
320
0
        return Status::InternalError("ZSTD_initDStream error: {}", ZSTD_getErrorName(ret));
321
0
    }
322
0
    return Status::OK();
323
0
}
324
325
Status ZstdDecompressor::decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read,
326
                                    uint8_t* output, size_t output_max_len,
327
                                    size_t* decompressed_len, bool* stream_end,
328
0
                                    size_t* more_input_bytes, size_t* more_output_bytes) {
329
    // 1. set input and output
330
0
    ZSTD_inBuffer inputBuffer = {input, input_len, 0};
331
0
    ZSTD_outBuffer outputBuffer = {output, output_max_len, 0};
332
333
    // decompress
334
0
    int ret = ZSTD_decompressStream(_zstd_strm, &outputBuffer, &inputBuffer);
335
0
    *input_bytes_read = inputBuffer.pos;
336
0
    *decompressed_len = outputBuffer.pos;
337
338
0
    if (ZSTD_isError(ret)) {
339
0
        return Status::InternalError("Failed to zstd decompress: {}", ZSTD_getErrorName(ret));
340
0
    }
341
342
0
    *stream_end = ret == 0;
343
0
    return Status::OK();
344
0
}
345
346
0
std::string ZstdDecompressor::debug_info() {
347
0
    std::stringstream ss;
348
0
    ss << "ZstdDecompressor.";
349
0
    return ss.str();
350
0
}
351
352
// Lz4Frame
353
// Lz4 version: 1.7.5
354
// define LZ4F_VERSION = 100
355
const unsigned Lz4FrameDecompressor::DORIS_LZ4F_VERSION = 100;
356
357
0
Lz4FrameDecompressor::~Lz4FrameDecompressor() {
358
0
    LZ4F_freeDecompressionContext(_dctx);
359
0
}
360
361
0
Status Lz4FrameDecompressor::init() {
362
0
    size_t ret = LZ4F_createDecompressionContext(&_dctx, DORIS_LZ4F_VERSION);
363
0
    if (LZ4F_isError(ret)) {
364
0
        std::stringstream ss;
365
0
        ss << "LZ4F_dctx creation error: " << std::string(LZ4F_getErrorName(ret));
366
0
        return Status::InternalError(ss.str());
367
0
    }
368
369
    // init as -1
370
0
    _expect_dec_buf_size = -1;
371
372
0
    return Status::OK();
373
0
}
374
375
Status Lz4FrameDecompressor::decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read,
376
                                        uint8_t* output, size_t output_max_len,
377
                                        size_t* decompressed_len, bool* stream_end,
378
0
                                        size_t* more_input_bytes, size_t* more_output_bytes) {
379
0
    uint8_t* src = input;
380
0
    size_t remaining_input_size = input_len;
381
0
    size_t ret = 1;
382
0
    *input_bytes_read = 0;
383
384
0
    if (_expect_dec_buf_size == -1) {
385
        // init expected decompress buf size, and check if output_max_len is large enough
386
        // ATTN: _expect_dec_buf_size is uninit, which means this is the first time to call
387
        //       decompress(), so *input* should point to the head of the compressed file,
388
        //       where lz4 header section is there.
389
390
0
        if (input_len < 15) {
391
0
            return Status::InternalError(
392
0
                    "Lz4 header size is between 7 and 15 bytes. "
393
0
                    "but input size is only: {}",
394
0
                    input_len);
395
0
        }
396
397
0
        LZ4F_frameInfo_t info;
398
0
        ret = LZ4F_getFrameInfo(_dctx, &info, (void*)src, &remaining_input_size);
399
0
        if (LZ4F_isError(ret)) {
400
0
            return Status::InternalError("LZ4F_getFrameInfo error: {}",
401
0
                                         std::string(LZ4F_getErrorName(ret)));
402
0
        }
403
404
0
        _expect_dec_buf_size = get_block_size(&info);
405
0
        if (_expect_dec_buf_size == -1) {
406
0
            return Status::InternalError(
407
0
                    "Impossible lz4 block size unless more block sizes are allowed {}",
408
0
                    std::string(LZ4F_getErrorName(ret)));
409
0
        }
410
411
0
        *input_bytes_read = remaining_input_size;
412
413
0
        src += remaining_input_size;
414
0
        remaining_input_size = input_len - remaining_input_size;
415
416
0
        LOG(INFO) << "lz4 block size: " << _expect_dec_buf_size;
417
0
    }
418
419
    // decompress
420
0
    size_t output_len = output_max_len;
421
0
    ret = LZ4F_decompress(_dctx, (void*)output, &output_len, (void*)src, &remaining_input_size,
422
0
                          /* LZ4F_decompressOptions_t */ nullptr);
423
0
    if (LZ4F_isError(ret)) {
424
0
        return Status::InternalError("Decompression error: {}",
425
0
                                     std::string(LZ4F_getErrorName(ret)));
426
0
    }
427
428
    // update
429
0
    *input_bytes_read += remaining_input_size;
430
0
    *decompressed_len = output_len;
431
0
    if (ret == 0) {
432
0
        *stream_end = true;
433
0
    } else {
434
0
        *stream_end = false;
435
0
    }
436
437
0
    return Status::OK();
438
0
}
439
440
0
std::string Lz4FrameDecompressor::debug_info() {
441
0
    std::stringstream ss;
442
0
    ss << "Lz4FrameDecompressor."
443
0
       << " expect dec buf size: " << _expect_dec_buf_size
444
0
       << " Lz4 Frame Version: " << DORIS_LZ4F_VERSION;
445
0
    return ss.str();
446
0
}
447
448
0
size_t Lz4FrameDecompressor::get_block_size(const LZ4F_frameInfo_t* info) {
449
0
    switch (info->blockSizeID) {
450
0
    case LZ4F_default:
451
0
    case LZ4F_max64KB:
452
0
        return 1 << 16;
453
0
    case LZ4F_max256KB:
454
0
        return 1 << 18;
455
0
    case LZ4F_max1MB:
456
0
        return 1 << 20;
457
0
    case LZ4F_max4MB:
458
0
        return 1 << 22;
459
0
    default:
460
        // error
461
0
        return -1;
462
0
    }
463
0
}
464
465
/// Lz4BlockDecompressor
466
0
Status Lz4BlockDecompressor::init() {
467
0
    return Status::OK();
468
0
}
469
470
// Hadoop lz4codec source :
471
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
472
// Example:
473
// OriginData(The original data will be divided into several large data block.) :
474
//      large data block1 | large data block2 | large data block3 | ....
475
// The large data block will be divided into several small data block.
476
// Suppose a large data block is divided into three small blocks:
477
// large data block1:            | small block1 | small block2 | small block3 |
478
// CompressData:   <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]>
479
//
480
// A : original length of the current block of large data block.
481
// sizeof(A) = 4 bytes.
482
// A = length(small block1) + length(small block2) + length(small block3)
483
// Bx : length of  small data block bx.
484
// sizeof(Bx) = 4 bytes.
485
// Bx = length(compress(small blockx))
486
Status Lz4BlockDecompressor::decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read,
487
                                        uint8_t* output, size_t output_max_len,
488
                                        size_t* decompressed_len, bool* stream_end,
489
0
                                        size_t* more_input_bytes, size_t* more_output_bytes) {
490
0
    auto* input_ptr = input;
491
0
    auto* output_ptr = output;
492
493
0
    while (input_len > 0) {
494
0
        if (input_len < sizeof(uint32_t)) {
495
0
            *more_input_bytes = sizeof(uint32_t) - input_len;
496
0
            break;
497
0
        }
498
499
        //if faild, fall back to large block begin
500
0
        auto* large_block_input_ptr = input_ptr;
501
0
        auto* large_block_output_ptr = output_ptr;
502
503
0
        uint32_t remaining_decompressed_large_block_len = BigEndian::Load32(input_ptr);
504
505
0
        input_ptr += sizeof(uint32_t);
506
0
        input_len -= sizeof(uint32_t);
507
508
0
        std::size_t remaining_output_len = output_max_len - *decompressed_len;
509
510
0
        if (remaining_output_len < remaining_decompressed_large_block_len) {
511
            // Need more output buffer
512
0
            *more_output_bytes = remaining_decompressed_large_block_len - remaining_output_len;
513
0
            input_ptr = large_block_input_ptr;
514
0
            output_ptr = large_block_output_ptr;
515
516
0
            break;
517
0
        }
518
519
0
        std::size_t decompressed_large_block_len = 0;
520
0
        while (remaining_decompressed_large_block_len > 0) {
521
            // Check that input length should not be negative.
522
0
            if (input_len < sizeof(uint32_t)) {
523
0
                *more_input_bytes = sizeof(uint32_t) - input_len;
524
0
                break;
525
0
            }
526
527
            // Read the length of the next lz4 compressed block.
528
0
            size_t compressed_small_block_len = BigEndian::Load32(input_ptr);
529
530
0
            input_ptr += sizeof(uint32_t);
531
0
            input_len -= sizeof(uint32_t);
532
533
0
            if (compressed_small_block_len == 0) {
534
0
                continue;
535
0
            }
536
537
0
            if (compressed_small_block_len > input_len) {
538
                // Need more input buffer
539
0
                *more_input_bytes = compressed_small_block_len - input_len;
540
0
                break;
541
0
            }
542
543
            // Decompress this block.
544
0
            auto decompressed_small_block_len = LZ4_decompress_safe(
545
0
                    reinterpret_cast<const char*>(input_ptr), reinterpret_cast<char*>(output_ptr),
546
0
                    compressed_small_block_len, remaining_output_len);
547
0
            if (decompressed_small_block_len < 0) {
548
0
                return Status::InvalidArgument("fail to do LZ4 decompress, error = {}",
549
0
                                               LZ4F_getErrorName(decompressed_small_block_len));
550
0
            }
551
0
            input_ptr += compressed_small_block_len;
552
0
            input_len -= compressed_small_block_len;
553
554
0
            output_ptr += decompressed_small_block_len;
555
0
            remaining_decompressed_large_block_len -= decompressed_small_block_len;
556
0
            decompressed_large_block_len += decompressed_small_block_len;
557
0
        };
558
559
0
        if (*more_input_bytes != 0) {
560
            // Need more input buffer
561
0
            input_ptr = large_block_input_ptr;
562
0
            output_ptr = large_block_output_ptr;
563
0
            break;
564
0
        }
565
566
0
        *decompressed_len += decompressed_large_block_len;
567
0
    }
568
0
    *input_bytes_read += (input_ptr - input);
569
    // If no more input and output need, means this is the end of a compressed block
570
0
    *stream_end = (*more_input_bytes == 0 && *more_output_bytes == 0);
571
572
0
    return Status::OK();
573
0
}
574
575
0
std::string Lz4BlockDecompressor::debug_info() {
576
0
    std::stringstream ss;
577
0
    ss << "Lz4BlockDecompressor.";
578
0
    return ss.str();
579
0
}
580
581
/// SnappyBlockDecompressor
582
0
Status SnappyBlockDecompressor::init() {
583
0
    return Status::OK();
584
0
}
585
586
// Hadoop snappycodec source :
587
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
588
// Example:
589
// OriginData(The original data will be divided into several large data block.) :
590
//      large data block1 | large data block2 | large data block3 | ....
591
// The large data block will be divided into several small data block.
592
// Suppose a large data block is divided into three small blocks:
593
// large data block1:            | small block1 | small block2 | small block3 |
594
// CompressData:   <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]>
595
//
596
// A : original length of the current block of large data block.
597
// sizeof(A) = 4 bytes.
598
// A = length(small block1) + length(small block2) + length(small block3)
599
// Bx : length of  small data block bx.
600
// sizeof(Bx) = 4 bytes.
601
// Bx = length(compress(small blockx))
602
Status SnappyBlockDecompressor::decompress(uint8_t* input, size_t input_len,
603
                                           size_t* input_bytes_read, uint8_t* output,
604
                                           size_t output_max_len, size_t* decompressed_len,
605
                                           bool* stream_end, size_t* more_input_bytes,
606
0
                                           size_t* more_output_bytes) {
607
0
    auto* input_ptr = input;
608
0
    auto* output_ptr = output;
609
610
0
    while (input_len > 0) {
611
0
        if (input_len < sizeof(uint32_t)) {
612
0
            *more_input_bytes = sizeof(uint32_t) - input_len;
613
0
            break;
614
0
        }
615
616
        //if faild, fall back to large block begin
617
0
        auto* large_block_input_ptr = input_ptr;
618
0
        auto* large_block_output_ptr = output_ptr;
619
620
0
        uint32_t remaining_decompressed_large_block_len = BigEndian::Load32(input_ptr);
621
622
0
        input_ptr += sizeof(uint32_t);
623
0
        input_len -= sizeof(uint32_t);
624
625
0
        std::size_t remaining_output_len = output_max_len - *decompressed_len;
626
627
0
        if (remaining_output_len < remaining_decompressed_large_block_len) {
628
            // Need more output buffer
629
0
            *more_output_bytes = remaining_decompressed_large_block_len - remaining_output_len;
630
0
            input_ptr = large_block_input_ptr;
631
0
            output_ptr = large_block_output_ptr;
632
633
0
            break;
634
0
        }
635
636
0
        std::size_t decompressed_large_block_len = 0;
637
0
        while (remaining_decompressed_large_block_len > 0) {
638
            // Check that input length should not be negative.
639
0
            if (input_len < sizeof(uint32_t)) {
640
0
                *more_input_bytes = sizeof(uint32_t) - input_len;
641
0
                break;
642
0
            }
643
644
            // Read the length of the next snappy compressed block.
645
0
            size_t compressed_small_block_len = BigEndian::Load32(input_ptr);
646
647
0
            input_ptr += sizeof(uint32_t);
648
0
            input_len -= sizeof(uint32_t);
649
650
0
            if (compressed_small_block_len == 0) {
651
0
                continue;
652
0
            }
653
654
0
            if (compressed_small_block_len > input_len) {
655
                // Need more input buffer
656
0
                *more_input_bytes = compressed_small_block_len - input_len;
657
0
                break;
658
0
            }
659
660
            // Decompress this block.
661
0
            size_t decompressed_small_block_len;
662
0
            if (!snappy::GetUncompressedLength(reinterpret_cast<const char*>(input_ptr),
663
0
                                               compressed_small_block_len,
664
0
                                               &decompressed_small_block_len)) {
665
0
                return Status::InternalError(
666
0
                        "snappy block decompress failed to get uncompressed len");
667
0
            }
668
0
            if (!snappy::RawUncompress(reinterpret_cast<const char*>(input_ptr),
669
0
                                       compressed_small_block_len,
670
0
                                       reinterpret_cast<char*>(output_ptr))) {
671
0
                return Status::InternalError(
672
0
                        "snappy block decompress failed. uncompressed_len: {}, compressed_len: {}",
673
0
                        decompressed_small_block_len, compressed_small_block_len);
674
0
            }
675
0
            input_ptr += compressed_small_block_len;
676
0
            input_len -= compressed_small_block_len;
677
678
0
            output_ptr += decompressed_small_block_len;
679
0
            remaining_decompressed_large_block_len -= decompressed_small_block_len;
680
0
            decompressed_large_block_len += decompressed_small_block_len;
681
0
        };
682
683
0
        if (*more_input_bytes != 0) {
684
            // Need more input buffer
685
0
            input_ptr = large_block_input_ptr;
686
0
            output_ptr = large_block_output_ptr;
687
0
            break;
688
0
        }
689
690
0
        *decompressed_len += decompressed_large_block_len;
691
0
    }
692
0
    *input_bytes_read += (input_ptr - input);
693
    // If no more input and output need, means this is the end of a compressed block
694
0
    *stream_end = (*more_input_bytes == 0 && *more_output_bytes == 0);
695
696
0
    return Status::OK();
697
0
}
698
699
0
std::string SnappyBlockDecompressor::debug_info() {
700
0
    std::stringstream ss;
701
0
    ss << "SnappyBlockDecompressor.";
702
0
    return ss.str();
703
0
}
704
705
} // namespace doris