Coverage Report

Created: 2025-09-15 21:57

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/exec/decompressor.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bzlib.h>
21
#include <lz4/lz4.h>
22
#include <lz4/lz4frame.h>
23
#include <lz4/lz4hc.h>
24
#include <snappy.h>
25
#include <stddef.h>
26
#include <stdint.h>
27
#include <zlib.h>
28
#include <zstd.h>
29
30
#include <memory>
31
#include <string>
32
33
#include "common/status.h"
34
#include "gen_cpp/PlanNodes_types.h"
35
36
namespace doris {
37
#include "common/compile_check_begin.h"
38
39
enum CompressType {
40
    UNCOMPRESSED,
41
    GZIP,
42
    DEFLATE,
43
    BZIP2,
44
    ZSTD,
45
    LZ4FRAME,
46
    LZOP,
47
    LZ4BLOCK,
48
    SNAPPYBLOCK
49
};
50
51
class Decompressor {
52
public:
53
0
    virtual ~Decompressor() = default;
54
55
    // implement in derived class
56
    // input(in):               buf where decompress begin
57
    // input_len(in):           max length of input buf
58
    // input_bytes_read(out):   bytes which is consumed by decompressor
59
    // output(out):             buf where to save decompressed data
60
    // output_max_len(in):      max length of output buf
61
    // decompressed_len(out):   decompressed data size in output buf
62
    // stream_end(out):         true if reach the and of stream,
63
    //                          or normally finished decompressing entire block
64
    // more_input_bytes(out):   decompressor need more bytes to consume
65
    // more_output_bytes(out):  decompressor need more space to save decompressed data
66
    //
67
    // input and output buf should be allocated and released outside
68
    virtual Status decompress(uint8_t* input, uint32_t input_len, size_t* input_bytes_read,
69
                              uint8_t* output, uint32_t output_max_len, size_t* decompressed_len,
70
                              bool* stream_end, size_t* more_input_bytes,
71
                              size_t* more_output_bytes) = 0;
72
73
public:
74
    static Status create_decompressor(CompressType type,
75
                                      std::unique_ptr<Decompressor>* decompressor);
76
77
    static Status create_decompressor(TFileCompressType::type type,
78
                                      std::unique_ptr<Decompressor>* decompressor);
79
80
    static Status create_decompressor(TFileFormatType::type type,
81
                                      std::unique_ptr<Decompressor>* decompressor);
82
83
    virtual std::string debug_info();
84
85
0
    CompressType get_type() { return _ctype; }
86
87
protected:
88
    virtual Status init() = 0;
89
90
    static uint32_t _read_int32(uint8_t* buf);
91
92
0
    Decompressor(CompressType ctype) : _ctype(ctype) {}
93
94
    CompressType _ctype;
95
};
96
97
class GzipDecompressor : public Decompressor {
98
public:
99
    ~GzipDecompressor() override;
100
101
    Status decompress(uint8_t* input, uint32_t input_len, size_t* input_bytes_read, uint8_t* output,
102
                      uint32_t output_max_len, size_t* decompressed_len, bool* stream_end,
103
                      size_t* more_input_bytes, size_t* more_output_bytes) override;
104
105
    std::string debug_info() override;
106
107
private:
108
    friend class Decompressor;
109
    GzipDecompressor(bool is_deflate);
110
    Status init() override;
111
112
private:
113
    bool _is_deflate;
114
115
    z_stream _z_strm;
116
117
    // These are magic numbers from zlib.h.  Not clear why they are not defined there.
118
    const static int WINDOW_BITS = 15;  // Maximum window size
119
    const static int DETECT_CODEC = 32; // Determine if this is libz or gzip from header.
120
};
121
122
class Bzip2Decompressor : public Decompressor {
123
public:
124
    ~Bzip2Decompressor() override;
125
126
    Status decompress(uint8_t* input, uint32_t input_len, size_t* input_bytes_read, uint8_t* output,
127
                      uint32_t output_max_len, size_t* decompressed_len, bool* stream_end,
128
                      size_t* more_input_bytes, size_t* more_output_bytes) override;
129
130
    std::string debug_info() override;
131
132
private:
133
    friend class Decompressor;
134
0
    Bzip2Decompressor() : Decompressor(CompressType::BZIP2) {}
135
    Status init() override;
136
137
private:
138
    bz_stream _bz_strm;
139
};
140
141
class ZstdDecompressor : public Decompressor {
142
public:
143
    ~ZstdDecompressor() override;
144
145
    Status decompress(uint8_t* input, uint32_t input_len, size_t* input_bytes_read, uint8_t* output,
146
                      uint32_t output_max_len, size_t* decompressed_len, bool* stream_end,
147
                      size_t* more_input_bytes, size_t* more_output_bytes) override;
148
149
    std::string debug_info() override;
150
151
private:
152
    friend class Decompressor;
153
0
    ZstdDecompressor() : Decompressor(CompressType::ZSTD) {}
154
    Status init() override;
155
156
private:
157
    ZSTD_DStream* _zstd_strm {nullptr};
158
};
159
160
class Lz4FrameDecompressor : public Decompressor {
161
public:
162
    ~Lz4FrameDecompressor() override;
163
164
    Status decompress(uint8_t* input, uint32_t input_len, size_t* input_bytes_read, uint8_t* output,
165
                      uint32_t output_max_len, size_t* decompressed_len, bool* stream_end,
166
                      size_t* more_input_bytes, size_t* more_output_bytes) override;
167
168
    std::string debug_info() override;
169
170
private:
171
    friend class Decompressor;
172
0
    Lz4FrameDecompressor() : Decompressor(CompressType::LZ4FRAME) {}
173
    Status init() override;
174
175
    size_t get_block_size(const LZ4F_frameInfo_t* info);
176
177
private:
178
    LZ4F_dctx* _dctx = nullptr;
179
    size_t _expect_dec_buf_size;
180
    const static unsigned DORIS_LZ4F_VERSION;
181
};
182
183
class Lz4BlockDecompressor : public Decompressor {
184
public:
185
0
    ~Lz4BlockDecompressor() override {}
186
187
    Status decompress(uint8_t* input, uint32_t input_len, size_t* input_bytes_read, uint8_t* output,
188
                      uint32_t output_max_len, size_t* decompressed_len, bool* stream_end,
189
                      size_t* more_input_bytes, size_t* more_output_bytes) override;
190
191
    std::string debug_info() override;
192
193
private:
194
    friend class Decompressor;
195
0
    Lz4BlockDecompressor() : Decompressor(CompressType::LZ4FRAME) {}
196
    Status init() override;
197
};
198
199
class SnappyBlockDecompressor : public Decompressor {
200
public:
201
0
    ~SnappyBlockDecompressor() override {}
202
203
    Status decompress(uint8_t* input, uint32_t input_len, size_t* input_bytes_read, uint8_t* output,
204
                      uint32_t output_max_len, size_t* decompressed_len, bool* stream_end,
205
                      size_t* more_input_bytes, size_t* more_output_bytes) override;
206
207
    std::string debug_info() override;
208
209
private:
210
    friend class Decompressor;
211
0
    SnappyBlockDecompressor() : Decompressor(CompressType::SNAPPYBLOCK) {}
212
    Status init() override;
213
};
214
215
class LzopDecompressor : public Decompressor {
216
public:
217
0
    ~LzopDecompressor() override = default;
218
219
    Status decompress(uint8_t* input, uint32_t input_len, size_t* input_bytes_read, uint8_t* output,
220
                      uint32_t output_max_len, size_t* decompressed_len, bool* stream_end,
221
                      size_t* more_input_bytes, size_t* more_output_bytes) override;
222
223
    std::string debug_info() override;
224
225
private:
226
    friend class Decompressor;
227
    LzopDecompressor()
228
0
            : Decompressor(CompressType::LZOP), _header_info(), _is_header_loaded(false) {}
229
    Status init() override;
230
231
private:
232
    enum LzoChecksum { CHECK_NONE, CHECK_CRC32, CHECK_ADLER };
233
234
private:
235
0
    uint8_t* get_uint8(uint8_t* ptr, uint8_t* value) {
236
0
        *value = *ptr;
237
0
        return ptr + sizeof(uint8_t);
238
0
    }
239
240
0
    uint8_t* get_uint16(uint8_t* ptr, uint16_t* value) {
241
0
        *value = static_cast<uint16_t>(*ptr << 8) | *(ptr + 1);
242
0
        return ptr + sizeof(uint16_t);
243
0
    }
244
245
0
    uint8_t* get_uint32(uint8_t* ptr, uint32_t* value) {
246
0
        *value = (*ptr << 24) | (*(ptr + 1) << 16) | (*(ptr + 2) << 8) | *(ptr + 3);
247
0
        return ptr + sizeof(uint32_t);
248
0
    }
249
250
0
    LzoChecksum header_type(int flags) { return (flags & F_H_CRC32) ? CHECK_CRC32 : CHECK_ADLER; }
251
252
0
    LzoChecksum input_type(int flags) {
253
0
        return (flags & F_CRC32_C) ? CHECK_CRC32 : (flags & F_ADLER32_C) ? CHECK_ADLER : CHECK_NONE;
254
0
    }
255
256
0
    LzoChecksum output_type(int flags) {
257
0
        return (flags & F_CRC32_D) ? CHECK_CRC32 : (flags & F_ADLER32_D) ? CHECK_ADLER : CHECK_NONE;
258
0
    }
259
260
    Status parse_header_info(uint8_t* input, size_t input_len, size_t* input_bytes_read,
261
                             size_t* more_bytes_needed);
262
263
    Status checksum(LzoChecksum type, const std::string& source, uint32_t expected, uint8_t* ptr,
264
                    size_t len);
265
266
private:
267
    // lzop header info
268
    struct HeaderInfo {
269
        uint16_t version;
270
        uint16_t lib_version;
271
        uint16_t version_needed;
272
        uint8_t method;
273
        std::string filename;
274
        uint32_t header_size;
275
        LzoChecksum header_checksum_type;
276
        LzoChecksum input_checksum_type;
277
        LzoChecksum output_checksum_type;
278
    };
279
280
    struct HeaderInfo _header_info;
281
282
    // true if header is decompressed and loaded
283
    bool _is_header_loaded;
284
285
private:
286
    const static uint8_t LZOP_MAGIC[9];
287
    const static uint64_t LZOP_VERSION;
288
    const static uint64_t MIN_LZO_VERSION;
289
    const static uint32_t MIN_HEADER_SIZE;
290
    const static uint32_t LZO_MAX_BLOCK_SIZE;
291
292
    const static uint32_t CRC32_INIT_VALUE;
293
    const static uint32_t ADLER32_INIT_VALUE;
294
295
    const static uint64_t F_H_CRC32;
296
    const static uint64_t F_MASK;
297
    const static uint64_t F_OS_MASK;
298
    const static uint64_t F_CS_MASK;
299
    const static uint64_t F_RESERVED;
300
    const static uint64_t F_MULTIPART;
301
    const static uint64_t F_H_FILTER;
302
    const static uint64_t F_H_EXTRA_FIELD;
303
    const static uint64_t F_CRC32_C;
304
    const static uint64_t F_ADLER32_C;
305
    const static uint64_t F_CRC32_D;
306
    const static uint64_t F_ADLER32_D;
307
};
308
309
#include "common/compile_check_end.h"
310
} // namespace doris