Coverage Report

Created: 2024-11-20 12:06

/root/doris/be/src/exec/decompressor.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bzlib.h>
21
#include <lz4/lz4.h>
22
#include <lz4/lz4frame.h>
23
#include <lz4/lz4hc.h>
24
#include <snappy.h>
25
#include <stddef.h>
26
#include <stdint.h>
27
#include <zlib.h>
28
#include <zstd.h>
29
30
#include <memory>
31
#include <string>
32
33
#include "common/status.h"
34
#include "gen_cpp/PlanNodes_types.h"
35
36
namespace doris {
37
38
enum CompressType {
39
    UNCOMPRESSED,
40
    GZIP,
41
    DEFLATE,
42
    BZIP2,
43
    ZSTD,
44
    LZ4FRAME,
45
    LZOP,
46
    LZ4BLOCK,
47
    SNAPPYBLOCK
48
};
49
50
class Decompressor {
51
public:
52
0
    virtual ~Decompressor() = default;
53
54
    // implement in derived class
55
    // input(in):               buf where decompress begin
56
    // input_len(in):           max length of input buf
57
    // input_bytes_read(out):   bytes which is consumed by decompressor
58
    // output(out):             buf where to save decompressed data
59
    // output_max_len(in):      max length of output buf
60
    // decompressed_len(out):   decompressed data size in output buf
61
    // stream_end(out):         true if reach the and of stream,
62
    //                          or normally finished decompressing entire block
63
    // more_input_bytes(out):   decompressor need more bytes to consume
64
    // more_output_bytes(out):  decompressor need more space to save decompressed data
65
    //
66
    // input and output buf should be allocated and released outside
67
    virtual Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read,
68
                              uint8_t* output, size_t output_max_len, size_t* decompressed_len,
69
                              bool* stream_end, size_t* more_input_bytes,
70
                              size_t* more_output_bytes) = 0;
71
72
public:
73
    static Status create_decompressor(CompressType type,
74
                                      std::unique_ptr<Decompressor>* decompressor);
75
76
    static Status create_decompressor(TFileCompressType::type type,
77
                                      std::unique_ptr<Decompressor>* decompressor);
78
79
    static Status create_decompressor(TFileFormatType::type type,
80
                                      std::unique_ptr<Decompressor>* decompressor);
81
82
    virtual std::string debug_info();
83
84
0
    CompressType get_type() { return _ctype; }
85
86
protected:
87
    virtual Status init() = 0;
88
89
    static uint32_t _read_int32(uint8_t* buf);
90
91
0
    Decompressor(CompressType ctype) : _ctype(ctype) {}
92
93
    CompressType _ctype;
94
};
95
96
class GzipDecompressor : public Decompressor {
97
public:
98
    ~GzipDecompressor() override;
99
100
    Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read, uint8_t* output,
101
                      size_t output_max_len, size_t* decompressed_len, bool* stream_end,
102
                      size_t* more_input_bytes, size_t* more_output_bytes) override;
103
104
    std::string debug_info() override;
105
106
private:
107
    friend class Decompressor;
108
    GzipDecompressor(bool is_deflate);
109
    Status init() override;
110
111
private:
112
    bool _is_deflate;
113
114
    z_stream _z_strm;
115
116
    // These are magic numbers from zlib.h.  Not clear why they are not defined there.
117
    const static int WINDOW_BITS = 15;  // Maximum window size
118
    const static int DETECT_CODEC = 32; // Determine if this is libz or gzip from header.
119
};
120
121
class Bzip2Decompressor : public Decompressor {
122
public:
123
    ~Bzip2Decompressor() override;
124
125
    Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read, uint8_t* output,
126
                      size_t output_max_len, size_t* decompressed_len, bool* stream_end,
127
                      size_t* more_input_bytes, size_t* more_output_bytes) override;
128
129
    std::string debug_info() override;
130
131
private:
132
    friend class Decompressor;
133
0
    Bzip2Decompressor() : Decompressor(CompressType::BZIP2) {}
134
    Status init() override;
135
136
private:
137
    bz_stream _bz_strm;
138
};
139
140
class ZstdDecompressor : public Decompressor {
141
public:
142
    ~ZstdDecompressor() override;
143
144
    Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read, uint8_t* output,
145
                      size_t output_max_len, size_t* decompressed_len, bool* stream_end,
146
                      size_t* more_input_bytes, size_t* more_output_bytes) override;
147
148
    std::string debug_info() override;
149
150
private:
151
    friend class Decompressor;
152
0
    ZstdDecompressor() : Decompressor(CompressType::ZSTD) {}
153
    Status init() override;
154
155
private:
156
    ZSTD_DStream* _zstd_strm {nullptr};
157
};
158
159
class Lz4FrameDecompressor : public Decompressor {
160
public:
161
    ~Lz4FrameDecompressor() override;
162
163
    Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read, uint8_t* output,
164
                      size_t output_max_len, size_t* decompressed_len, bool* stream_end,
165
                      size_t* more_input_bytes, size_t* more_output_bytes) override;
166
167
    std::string debug_info() override;
168
169
private:
170
    friend class Decompressor;
171
0
    Lz4FrameDecompressor() : Decompressor(CompressType::LZ4FRAME) {}
172
    Status init() override;
173
174
    size_t get_block_size(const LZ4F_frameInfo_t* info);
175
176
private:
177
    LZ4F_dctx* _dctx = nullptr;
178
    size_t _expect_dec_buf_size;
179
    const static unsigned DORIS_LZ4F_VERSION;
180
};
181
182
class Lz4BlockDecompressor : public Decompressor {
183
public:
184
0
    ~Lz4BlockDecompressor() override {}
185
186
    Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read, uint8_t* output,
187
                      size_t output_max_len, size_t* decompressed_len, bool* stream_end,
188
                      size_t* more_input_bytes, size_t* more_output_bytes) override;
189
190
    std::string debug_info() override;
191
192
private:
193
    friend class Decompressor;
194
0
    Lz4BlockDecompressor() : Decompressor(CompressType::LZ4FRAME) {}
195
    Status init() override;
196
};
197
198
class SnappyBlockDecompressor : public Decompressor {
199
public:
200
0
    ~SnappyBlockDecompressor() override {}
201
202
    Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read, uint8_t* output,
203
                      size_t output_max_len, size_t* decompressed_len, bool* stream_end,
204
                      size_t* more_input_bytes, size_t* more_output_bytes) override;
205
206
    std::string debug_info() override;
207
208
private:
209
    friend class Decompressor;
210
0
    SnappyBlockDecompressor() : Decompressor(CompressType::SNAPPYBLOCK) {}
211
    Status init() override;
212
};
213
214
class LzopDecompressor : public Decompressor {
215
public:
216
0
    ~LzopDecompressor() override = default;
217
218
    Status decompress(uint8_t* input, size_t input_len, size_t* input_bytes_read, uint8_t* output,
219
                      size_t output_max_len, size_t* decompressed_len, bool* stream_end,
220
                      size_t* more_input_bytes, size_t* more_output_bytes) override;
221
222
    std::string debug_info() override;
223
224
private:
225
    friend class Decompressor;
226
    LzopDecompressor()
227
0
            : Decompressor(CompressType::LZOP), _header_info(), _is_header_loaded(false) {}
228
    Status init() override;
229
230
private:
231
    enum LzoChecksum { CHECK_NONE, CHECK_CRC32, CHECK_ADLER };
232
233
private:
234
0
    uint8_t* get_uint8(uint8_t* ptr, uint8_t* value) {
235
0
        *value = *ptr;
236
0
        return ptr + sizeof(uint8_t);
237
0
    }
238
239
0
    uint8_t* get_uint16(uint8_t* ptr, uint16_t* value) {
240
0
        *value = *ptr << 8 | *(ptr + 1);
241
0
        return ptr + sizeof(uint16_t);
242
0
    }
243
244
0
    uint8_t* get_uint32(uint8_t* ptr, uint32_t* value) {
245
0
        *value = (*ptr << 24) | (*(ptr + 1) << 16) | (*(ptr + 2) << 8) | *(ptr + 3);
246
0
        return ptr + sizeof(uint32_t);
247
0
    }
248
249
0
    LzoChecksum header_type(int flags) { return (flags & F_H_CRC32) ? CHECK_CRC32 : CHECK_ADLER; }
250
251
0
    LzoChecksum input_type(int flags) {
252
0
        return (flags & F_CRC32_C) ? CHECK_CRC32 : (flags & F_ADLER32_C) ? CHECK_ADLER : CHECK_NONE;
253
0
    }
254
255
0
    LzoChecksum output_type(int flags) {
256
0
        return (flags & F_CRC32_D) ? CHECK_CRC32 : (flags & F_ADLER32_D) ? CHECK_ADLER : CHECK_NONE;
257
0
    }
258
259
    Status parse_header_info(uint8_t* input, size_t input_len, size_t* input_bytes_read,
260
                             size_t* more_bytes_needed);
261
262
    Status checksum(LzoChecksum type, const std::string& source, uint32_t expected, uint8_t* ptr,
263
                    size_t len);
264
265
private:
266
    // lzop header info
267
    struct HeaderInfo {
268
        uint16_t version;
269
        uint16_t lib_version;
270
        uint16_t version_needed;
271
        uint8_t method;
272
        std::string filename;
273
        uint32_t header_size;
274
        LzoChecksum header_checksum_type;
275
        LzoChecksum input_checksum_type;
276
        LzoChecksum output_checksum_type;
277
    };
278
279
    struct HeaderInfo _header_info;
280
281
    // true if header is decompressed and loaded
282
    bool _is_header_loaded;
283
284
private:
285
    const static uint8_t LZOP_MAGIC[9];
286
    const static uint64_t LZOP_VERSION;
287
    const static uint64_t MIN_LZO_VERSION;
288
    const static uint32_t MIN_HEADER_SIZE;
289
    const static uint32_t LZO_MAX_BLOCK_SIZE;
290
291
    const static uint32_t CRC32_INIT_VALUE;
292
    const static uint32_t ADLER32_INIT_VALUE;
293
294
    const static uint64_t F_H_CRC32;
295
    const static uint64_t F_MASK;
296
    const static uint64_t F_OS_MASK;
297
    const static uint64_t F_CS_MASK;
298
    const static uint64_t F_RESERVED;
299
    const static uint64_t F_MULTIPART;
300
    const static uint64_t F_H_FILTER;
301
    const static uint64_t F_H_EXTRA_FIELD;
302
    const static uint64_t F_CRC32_C;
303
    const static uint64_t F_ADLER32_C;
304
    const static uint64_t F_CRC32_D;
305
    const static uint64_t F_ADLER32_D;
306
};
307
308
} // namespace doris