Coverage Report

Created: 2024-11-21 10:56

/root/doris/be/src/util/block_compression.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "util/block_compression.h"
19
20
#include <bzlib.h>
21
#include <gen_cpp/parquet_types.h>
22
#include <gen_cpp/segment_v2.pb.h>
23
#include <glog/logging.h>
24
25
#include <exception>
26
// Only used on x86 or x86_64
27
#if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \
28
        defined(__i386) || defined(_M_IX86)
29
#include <libdeflate.h>
30
#endif
31
#include <brotli/decode.h>
32
#include <glog/log_severity.h>
33
#include <glog/logging.h>
34
#include <lz4/lz4.h>
35
#include <lz4/lz4frame.h>
36
#include <lz4/lz4hc.h>
37
#include <snappy/snappy-sinksource.h>
38
#include <snappy/snappy.h>
39
#include <zconf.h>
40
#include <zlib.h>
41
#include <zstd.h>
42
#include <zstd_errors.h>
43
44
#include <algorithm>
45
#include <cstdint>
46
#include <limits>
47
#include <mutex>
48
#include <orc/Exceptions.hh>
49
#include <ostream>
50
51
#include "common/config.h"
52
#include "common/factory_creator.h"
53
#include "exec/decompressor.h"
54
#include "gutil/endian.h"
55
#include "gutil/strings/substitute.h"
56
#include "runtime/thread_context.h"
57
#include "util/defer_op.h"
58
#include "util/faststring.h"
59
60
namespace orc {
61
/**
62
 * Decompress the bytes in to the output buffer.
63
 * @param inputAddress the start of the input
64
 * @param inputLimit one past the last byte of the input
65
 * @param outputAddress the start of the output buffer
66
 * @param outputLimit one past the last byte of the output buffer
67
 * @result the number of bytes decompressed
68
 */
69
uint64_t lzoDecompress(const char* inputAddress, const char* inputLimit, char* outputAddress,
70
                       char* outputLimit);
71
} // namespace orc
72
73
namespace doris {
74
75
// exception safe
76
Status BlockCompressionCodec::compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
77
2
                                       faststring* output) {
78
2
    faststring buf;
79
    // we compute total size to avoid more memory copy
80
2
    buf.reserve(uncompressed_size);
81
10
    for (auto& input : inputs) {
82
10
        buf.append(input.data, input.size);
83
10
    }
84
2
    return compress(buf, output);
85
2
}
86
87
11.1k
bool BlockCompressionCodec::exceed_max_compress_len(size_t uncompressed_size) {
88
11.1k
    return uncompressed_size > std::numeric_limits<int32_t>::max();
89
11.1k
}
90
91
class Lz4BlockCompression : public BlockCompressionCodec {
92
private:
93
    class Context {
94
        ENABLE_FACTORY_CREATOR(Context);
95
96
    public:
97
1
        Context() : ctx(nullptr) {
98
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
99
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
100
1
            buffer = std::make_unique<faststring>();
101
1
        }
102
        LZ4_stream_t* ctx;
103
        std::unique_ptr<faststring> buffer;
104
1
        ~Context() {
105
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
106
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
107
1
            if (ctx) {
108
1
                LZ4_freeStream(ctx);
109
1
            }
110
1
            buffer.reset();
111
1
        }
112
    };
113
114
public:
115
59
    static Lz4BlockCompression* instance() {
116
59
        static Lz4BlockCompression s_instance;
117
59
        return &s_instance;
118
59
    }
119
1
    ~Lz4BlockCompression() override {
120
1
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
121
1
                ExecEnv::GetInstance()->block_compression_mem_tracker());
122
1
        _ctx_pool.clear();
123
1
    }
124
125
44
    Status compress(const Slice& input, faststring* output) override {
126
44
        if (input.size > LZ4_MAX_INPUT_SIZE) {
127
0
            return Status::InvalidArgument(
128
0
                    "LZ4 not support those case(input.size>LZ4_MAX_INPUT_SIZE), maybe you should "
129
0
                    "change "
130
0
                    "fragment_transmission_compression_codec to snappy, input.size={}, "
131
0
                    "LZ4_MAX_INPUT_SIZE={}",
132
0
                    input.size, LZ4_MAX_INPUT_SIZE);
133
0
        }
134
135
44
        std::unique_ptr<Context> context;
136
44
        RETURN_IF_ERROR(_acquire_compression_ctx(context));
137
44
        bool compress_failed = false;
138
44
        Defer defer {[&] {
139
44
            if (!compress_failed) {
140
44
                _release_compression_ctx(std::move(context));
141
44
            }
142
44
        }};
143
144
44
        try {
145
44
            Slice compressed_buf;
146
44
            size_t max_len = max_compressed_len(input.size);
147
44
            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
148
                // use output directly
149
0
                output->resize(max_len);
150
0
                compressed_buf.data = reinterpret_cast<char*>(output->data());
151
0
                compressed_buf.size = max_len;
152
44
            } else {
153
                // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
154
44
                {
155
                    // context->buffer is resuable between queries, should accouting to
156
                    // global tracker.
157
44
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
158
44
                            ExecEnv::GetInstance()->block_compression_mem_tracker());
159
44
                    context->buffer->resize(max_len);
160
44
                }
161
44
                compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
162
44
                compressed_buf.size = max_len;
163
44
            }
164
165
44
            size_t compressed_len =
166
44
                    LZ4_compress_fast_continue(context->ctx, input.data, compressed_buf.data,
167
44
                                               input.size, compressed_buf.size, ACCELARATION);
168
44
            if (compressed_len == 0) {
169
0
                compress_failed = true;
170
0
                return Status::InvalidArgument("Output buffer's capacity is not enough, size={}",
171
0
                                               compressed_buf.size);
172
0
            }
173
44
            output->resize(compressed_len);
174
44
            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
175
44
                output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data),
176
44
                                    compressed_len);
177
44
            }
178
44
        } catch (...) {
179
            // Do not set compress_failed to release context
180
0
            DCHECK(!compress_failed);
181
0
            return Status::InternalError("Fail to do LZ4Block compress due to exception");
182
0
        }
183
44
        return Status::OK();
184
44
    }
185
186
30
    Status decompress(const Slice& input, Slice* output) override {
187
30
        auto decompressed_len =
188
30
                LZ4_decompress_safe(input.data, output->data, input.size, output->size);
189
30
        if (decompressed_len < 0) {
190
5
            return Status::InvalidArgument("fail to do LZ4 decompress, error={}", decompressed_len);
191
5
        }
192
25
        output->size = decompressed_len;
193
25
        return Status::OK();
194
30
    }
195
196
44
    size_t max_compressed_len(size_t len) override { return LZ4_compressBound(len); }
197
198
private:
199
    // reuse LZ4 compress stream
200
44
    Status _acquire_compression_ctx(std::unique_ptr<Context>& out) {
201
44
        std::lock_guard<std::mutex> l(_ctx_mutex);
202
44
        if (_ctx_pool.empty()) {
203
1
            std::unique_ptr<Context> localCtx = Context::create_unique();
204
1
            if (localCtx.get() == nullptr) {
205
0
                return Status::InvalidArgument("new LZ4 context error");
206
0
            }
207
1
            localCtx->ctx = LZ4_createStream();
208
1
            if (localCtx->ctx == nullptr) {
209
0
                return Status::InvalidArgument("LZ4_createStream error");
210
0
            }
211
1
            out = std::move(localCtx);
212
1
            return Status::OK();
213
1
        }
214
43
        out = std::move(_ctx_pool.back());
215
43
        _ctx_pool.pop_back();
216
43
        return Status::OK();
217
44
    }
218
44
    void _release_compression_ctx(std::unique_ptr<Context> context) {
219
44
        DCHECK(context);
220
44
        LZ4_resetStream(context->ctx);
221
44
        std::lock_guard<std::mutex> l(_ctx_mutex);
222
44
        _ctx_pool.push_back(std::move(context));
223
44
    }
224
225
private:
226
    mutable std::mutex _ctx_mutex;
227
    mutable std::vector<std::unique_ptr<Context>> _ctx_pool;
228
    static const int32_t ACCELARATION = 1;
229
};
230
231
class HadoopLz4BlockCompression : public Lz4BlockCompression {
232
public:
233
0
    HadoopLz4BlockCompression() {
234
0
        Status st = Decompressor::create_decompressor(CompressType::LZ4BLOCK, &_decompressor);
235
0
        if (!st.ok()) {
236
0
            LOG(FATAL) << "HadoopLz4BlockCompression construction failed. status = " << st << "\n";
237
0
        }
238
0
    }
239
240
0
    ~HadoopLz4BlockCompression() override = default;
241
242
0
    static HadoopLz4BlockCompression* instance() {
243
0
        static HadoopLz4BlockCompression s_instance;
244
0
        return &s_instance;
245
0
    }
246
247
    // hadoop use block compression for lz4
248
    // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
249
0
    Status compress(const Slice& input, faststring* output) override {
250
        // be same with hadop https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java
251
0
        size_t lz4_block_size = config::lz4_compression_block_size;
252
0
        size_t overhead = lz4_block_size / 255 + 16;
253
0
        size_t max_input_size = lz4_block_size - overhead;
254
255
0
        size_t data_len = input.size;
256
0
        char* data = input.data;
257
0
        std::vector<OwnedSlice> buffers;
258
0
        size_t out_len = 0;
259
260
0
        while (data_len > 0) {
261
0
            size_t input_size = std::min(data_len, max_input_size);
262
0
            Slice input_slice(data, input_size);
263
0
            faststring output_data;
264
0
            RETURN_IF_ERROR(Lz4BlockCompression::compress(input_slice, &output_data));
265
0
            out_len += output_data.size();
266
0
            buffers.push_back(output_data.build());
267
0
            data += input_size;
268
0
            data_len -= input_size;
269
0
        }
270
271
        // hadoop block compression: umcompressed_length | compressed_length1 | compressed_data1 | compressed_length2 | compressed_data2 | ...
272
0
        size_t total_output_len = 4 + 4 * buffers.size() + out_len;
273
0
        output->resize(total_output_len);
274
0
        char* output_buffer = (char*)output->data();
275
0
        BigEndian::Store32(output_buffer, input.get_size());
276
0
        output_buffer += 4;
277
0
        for (const auto& buffer : buffers) {
278
0
            auto slice = buffer.slice();
279
0
            BigEndian::Store32(output_buffer, slice.get_size());
280
0
            output_buffer += 4;
281
0
            memcpy(output_buffer, slice.get_data(), slice.get_size());
282
0
            output_buffer += slice.get_size();
283
0
        }
284
285
0
        DCHECK_EQ(output_buffer - (char*)output->data(), total_output_len);
286
287
0
        return Status::OK();
288
0
    }
289
290
0
    Status decompress(const Slice& input, Slice* output) override {
291
0
        size_t input_bytes_read = 0;
292
0
        size_t decompressed_len = 0;
293
0
        size_t more_input_bytes = 0;
294
0
        size_t more_output_bytes = 0;
295
0
        bool stream_end = false;
296
0
        auto st = _decompressor->decompress((uint8_t*)input.data, input.size, &input_bytes_read,
297
0
                                            (uint8_t*)output->data, output->size, &decompressed_len,
298
0
                                            &stream_end, &more_input_bytes, &more_output_bytes);
299
        //try decompress use hadoopLz4 ,if failed fall back lz4.
300
0
        return (st != Status::OK() || stream_end != true)
301
0
                       ? Lz4BlockCompression::decompress(input, output)
302
0
                       : Status::OK();
303
0
    }
304
305
private:
306
    std::unique_ptr<Decompressor> _decompressor;
307
};
308
// Used for LZ4 frame format, decompress speed is two times faster than LZ4.
309
class Lz4fBlockCompression : public BlockCompressionCodec {
310
private:
311
    class CContext {
312
        ENABLE_FACTORY_CREATOR(CContext);
313
314
    public:
315
1
        CContext() : ctx(nullptr) {
316
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
317
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
318
1
            buffer = std::make_unique<faststring>();
319
1
        }
320
        LZ4F_compressionContext_t ctx;
321
        std::unique_ptr<faststring> buffer;
322
1
        ~CContext() {
323
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
324
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
325
1
            if (ctx) {
326
1
                LZ4F_freeCompressionContext(ctx);
327
1
            }
328
1
            buffer.reset();
329
1
        }
330
    };
331
    class DContext {
332
        ENABLE_FACTORY_CREATOR(DContext);
333
334
    public:
335
6
        DContext() : ctx(nullptr) {}
336
        LZ4F_decompressionContext_t ctx;
337
6
        ~DContext() {
338
6
            if (ctx) {
339
6
                LZ4F_freeDecompressionContext(ctx);
340
6
            }
341
6
        }
342
    };
343
344
public:
345
20.3k
    static Lz4fBlockCompression* instance() {
346
20.3k
        static Lz4fBlockCompression s_instance;
347
20.3k
        return &s_instance;
348
20.3k
    }
349
1
    ~Lz4fBlockCompression() {
350
1
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
351
1
                ExecEnv::GetInstance()->block_compression_mem_tracker());
352
1
        _ctx_c_pool.clear();
353
1
        _ctx_d_pool.clear();
354
1
    }
355
356
5
    Status compress(const Slice& input, faststring* output) override {
357
5
        std::vector<Slice> inputs {input};
358
5
        return compress(inputs, input.size, output);
359
5
    }
360
361
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
362
11.0k
                    faststring* output) override {
363
11.0k
        return _compress(inputs, uncompressed_size, output);
364
11.0k
    }
365
366
4.99k
    Status decompress(const Slice& input, Slice* output) override {
367
4.99k
        return _decompress(input, output);
368
4.99k
    }
369
370
11.0k
    size_t max_compressed_len(size_t len) override {
371
11.0k
        return std::max(LZ4F_compressBound(len, &_s_preferences),
372
11.0k
                        LZ4F_compressFrameBound(len, &_s_preferences));
373
11.0k
    }
374
375
private:
376
    Status _compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
377
11.0k
                     faststring* output) {
378
11.0k
        std::unique_ptr<CContext> context;
379
11.0k
        RETURN_IF_ERROR(_acquire_compression_ctx(context));
380
11.0k
        bool compress_failed = false;
381
11.0k
        Defer defer {[&] {
382
11.0k
            if (!compress_failed) {
383
11.0k
                _release_compression_ctx(std::move(context));
384
11.0k
            }
385
11.0k
        }};
386
387
11.0k
        try {
388
11.0k
            Slice compressed_buf;
389
11.0k
            size_t max_len = max_compressed_len(uncompressed_size);
390
11.0k
            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
391
                // use output directly
392
0
                output->resize(max_len);
393
0
                compressed_buf.data = reinterpret_cast<char*>(output->data());
394
0
                compressed_buf.size = max_len;
395
11.0k
            } else {
396
11.0k
                {
397
11.0k
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
398
11.0k
                            ExecEnv::GetInstance()->block_compression_mem_tracker());
399
                    // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
400
11.0k
                    context->buffer->resize(max_len);
401
11.0k
                }
402
11.0k
                compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
403
11.0k
                compressed_buf.size = max_len;
404
11.0k
            }
405
406
11.0k
            auto wbytes = LZ4F_compressBegin(context->ctx, compressed_buf.data, compressed_buf.size,
407
11.0k
                                             &_s_preferences);
408
11.0k
            if (LZ4F_isError(wbytes)) {
409
0
                compress_failed = true;
410
0
                return Status::InvalidArgument("Fail to do LZ4F compress begin, res={}",
411
0
                                               LZ4F_getErrorName(wbytes));
412
0
            }
413
11.0k
            size_t offset = wbytes;
414
11.0k
            for (auto input : inputs) {
415
11.0k
                wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data + offset,
416
11.0k
                                             compressed_buf.size - offset, input.data, input.size,
417
11.0k
                                             nullptr);
418
11.0k
                if (LZ4F_isError(wbytes)) {
419
0
                    compress_failed = true;
420
0
                    return Status::InvalidArgument("Fail to do LZ4F compress update, res={}",
421
0
                                                   LZ4F_getErrorName(wbytes));
422
0
                }
423
11.0k
                offset += wbytes;
424
11.0k
            }
425
11.0k
            wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + offset,
426
11.0k
                                      compressed_buf.size - offset, nullptr);
427
11.0k
            if (LZ4F_isError(wbytes)) {
428
0
                compress_failed = true;
429
0
                return Status::InvalidArgument("Fail to do LZ4F compress end, res={}",
430
0
                                               LZ4F_getErrorName(wbytes));
431
0
            }
432
11.0k
            offset += wbytes;
433
11.0k
            output->resize(offset);
434
11.0k
            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
435
11.0k
                output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), offset);
436
11.0k
            }
437
11.0k
        } catch (...) {
438
            // Do not set compress_failed to release context
439
0
            DCHECK(!compress_failed);
440
0
            return Status::InternalError("Fail to do LZ4F compress due to exception");
441
0
        }
442
443
11.0k
        return Status::OK();
444
11.0k
    }
445
446
4.99k
    Status _decompress(const Slice& input, Slice* output) {
447
4.99k
        bool decompress_failed = false;
448
4.99k
        std::unique_ptr<DContext> context;
449
4.99k
        RETURN_IF_ERROR(_acquire_decompression_ctx(context));
450
4.99k
        Defer defer {[&] {
451
4.99k
            if (!decompress_failed) {
452
4.98k
                _release_decompression_ctx(std::move(context));
453
4.98k
            }
454
4.99k
        }};
455
4.99k
        size_t input_size = input.size;
456
4.99k
        auto lres = LZ4F_decompress(context->ctx, output->data, &output->size, input.data,
457
4.99k
                                    &input_size, nullptr);
458
4.99k
        if (LZ4F_isError(lres)) {
459
0
            decompress_failed = true;
460
0
            return Status::InvalidArgument("Fail to do LZ4F decompress, res={}",
461
0
                                           LZ4F_getErrorName(lres));
462
4.99k
        } else if (input_size != input.size) {
463
0
            decompress_failed = true;
464
0
            return Status::InvalidArgument(
465
0
                    strings::Substitute("Fail to do LZ4F decompress: trailing data left in "
466
0
                                        "compressed data, read=$0 vs given=$1",
467
0
                                        input_size, input.size));
468
4.99k
        } else if (lres != 0) {
469
5
            decompress_failed = true;
470
5
            return Status::InvalidArgument(
471
5
                    "Fail to do LZ4F decompress: expect more compressed data, expect={}", lres);
472
5
        }
473
4.98k
        return Status::OK();
474
4.99k
    }
475
476
private:
477
    // acquire a compression ctx from pool, release while finish compress,
478
    // delete if compression failed
479
11.0k
    Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) {
480
11.0k
        std::lock_guard<std::mutex> l(_ctx_c_mutex);
481
11.0k
        if (_ctx_c_pool.empty()) {
482
1
            std::unique_ptr<CContext> localCtx = CContext::create_unique();
483
1
            if (localCtx.get() == nullptr) {
484
0
                return Status::InvalidArgument("failed to new LZ4F CContext");
485
0
            }
486
1
            auto res = LZ4F_createCompressionContext(&localCtx->ctx, LZ4F_VERSION);
487
1
            if (LZ4F_isError(res) != 0) {
488
0
                return Status::InvalidArgument(strings::Substitute(
489
0
                        "LZ4F_createCompressionContext error, res=$0", LZ4F_getErrorName(res)));
490
0
            }
491
1
            out = std::move(localCtx);
492
1
            return Status::OK();
493
1
        }
494
11.0k
        out = std::move(_ctx_c_pool.back());
495
11.0k
        _ctx_c_pool.pop_back();
496
11.0k
        return Status::OK();
497
11.0k
    }
498
11.0k
    void _release_compression_ctx(std::unique_ptr<CContext> context) {
499
11.0k
        DCHECK(context);
500
11.0k
        std::lock_guard<std::mutex> l(_ctx_c_mutex);
501
11.0k
        _ctx_c_pool.push_back(std::move(context));
502
11.0k
    }
503
504
4.99k
    Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) {
505
4.99k
        std::lock_guard<std::mutex> l(_ctx_d_mutex);
506
4.99k
        if (_ctx_d_pool.empty()) {
507
6
            std::unique_ptr<DContext> localCtx = DContext::create_unique();
508
6
            if (localCtx.get() == nullptr) {
509
0
                return Status::InvalidArgument("failed to new LZ4F DContext");
510
0
            }
511
6
            auto res = LZ4F_createDecompressionContext(&localCtx->ctx, LZ4F_VERSION);
512
6
            if (LZ4F_isError(res) != 0) {
513
0
                return Status::InvalidArgument(strings::Substitute(
514
0
                        "LZ4F_createDeompressionContext error, res=$0", LZ4F_getErrorName(res)));
515
0
            }
516
6
            out = std::move(localCtx);
517
6
            return Status::OK();
518
6
        }
519
4.98k
        out = std::move(_ctx_d_pool.back());
520
4.98k
        _ctx_d_pool.pop_back();
521
4.98k
        return Status::OK();
522
4.99k
    }
523
4.98k
    void _release_decompression_ctx(std::unique_ptr<DContext> context) {
524
4.98k
        DCHECK(context);
525
        // reset decompression context to avoid ERROR_maxBlockSize_invalid
526
4.98k
        LZ4F_resetDecompressionContext(context->ctx);
527
4.98k
        std::lock_guard<std::mutex> l(_ctx_d_mutex);
528
4.98k
        _ctx_d_pool.push_back(std::move(context));
529
4.98k
    }
530
531
private:
532
    static LZ4F_preferences_t _s_preferences;
533
534
    std::mutex _ctx_c_mutex;
535
    // LZ4F_compressionContext_t is a pointer so no copy here
536
    std::vector<std::unique_ptr<CContext>> _ctx_c_pool;
537
538
    std::mutex _ctx_d_mutex;
539
    std::vector<std::unique_ptr<DContext>> _ctx_d_pool;
540
};
541
542
LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = {
543
        {LZ4F_max256KB, LZ4F_blockLinked, LZ4F_noContentChecksum, LZ4F_frame, 0ULL, 0U,
544
         LZ4F_noBlockChecksum},
545
        0,
546
        0u,
547
        0u,
548
        {0u, 0u, 0u}};
549
550
class Lz4HCBlockCompression : public BlockCompressionCodec {
551
private:
552
    class Context {
553
        ENABLE_FACTORY_CREATOR(Context);
554
555
    public:
556
1
        Context() : ctx(nullptr) {
557
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
558
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
559
1
            buffer = std::make_unique<faststring>();
560
1
        }
561
        LZ4_streamHC_t* ctx;
562
        std::unique_ptr<faststring> buffer;
563
1
        ~Context() {
564
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
565
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
566
1
            if (ctx) {
567
1
                LZ4_freeStreamHC(ctx);
568
1
            }
569
1
            buffer.reset();
570
1
        }
571
    };
572
573
public:
574
2
    static Lz4HCBlockCompression* instance() {
575
2
        static Lz4HCBlockCompression s_instance;
576
2
        return &s_instance;
577
2
    }
578
1
    ~Lz4HCBlockCompression() {
579
1
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
580
1
                ExecEnv::GetInstance()->block_compression_mem_tracker());
581
1
        _ctx_pool.clear();
582
1
    }
583
584
6
    Status compress(const Slice& input, faststring* output) override {
585
6
        std::unique_ptr<Context> context;
586
6
        RETURN_IF_ERROR(_acquire_compression_ctx(context));
587
6
        bool compress_failed = false;
588
6
        Defer defer {[&] {
589
6
            if (!compress_failed) {
590
6
                _release_compression_ctx(std::move(context));
591
6
            }
592
6
        }};
593
594
6
        try {
595
6
            Slice compressed_buf;
596
6
            size_t max_len = max_compressed_len(input.size);
597
6
            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
598
                // use output directly
599
0
                output->resize(max_len);
600
0
                compressed_buf.data = reinterpret_cast<char*>(output->data());
601
0
                compressed_buf.size = max_len;
602
6
            } else {
603
6
                {
604
6
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
605
6
                            ExecEnv::GetInstance()->block_compression_mem_tracker());
606
                    // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
607
6
                    context->buffer->resize(max_len);
608
6
                }
609
6
                compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
610
6
                compressed_buf.size = max_len;
611
6
            }
612
613
6
            size_t compressed_len = LZ4_compress_HC_continue(
614
6
                    context->ctx, input.data, compressed_buf.data, input.size, compressed_buf.size);
615
6
            if (compressed_len == 0) {
616
0
                compress_failed = true;
617
0
                return Status::InvalidArgument("Output buffer's capacity is not enough, size={}",
618
0
                                               compressed_buf.size);
619
0
            }
620
6
            output->resize(compressed_len);
621
6
            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
622
6
                output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data),
623
6
                                    compressed_len);
624
6
            }
625
6
        } catch (...) {
626
            // Do not set compress_failed to release context
627
0
            DCHECK(!compress_failed);
628
0
            return Status::InternalError("Fail to do LZ4HC compress due to exception");
629
0
        }
630
6
        return Status::OK();
631
6
    }
632
633
11
    Status decompress(const Slice& input, Slice* output) override {
634
11
        auto decompressed_len =
635
11
                LZ4_decompress_safe(input.data, output->data, input.size, output->size);
636
11
        if (decompressed_len < 0) {
637
5
            return Status::InvalidArgument("fail to do LZ4 decompress, error={}", decompressed_len);
638
5
        }
639
6
        output->size = decompressed_len;
640
6
        return Status::OK();
641
11
    }
642
643
6
    size_t max_compressed_len(size_t len) override { return LZ4_compressBound(len); }
644
645
private:
646
6
    Status _acquire_compression_ctx(std::unique_ptr<Context>& out) {
647
6
        std::lock_guard<std::mutex> l(_ctx_mutex);
648
6
        if (_ctx_pool.empty()) {
649
1
            std::unique_ptr<Context> localCtx = Context::create_unique();
650
1
            if (localCtx.get() == nullptr) {
651
0
                return Status::InvalidArgument("new LZ4HC context error");
652
0
            }
653
1
            localCtx->ctx = LZ4_createStreamHC();
654
1
            if (localCtx->ctx == nullptr) {
655
0
                return Status::InvalidArgument("LZ4_createStreamHC error");
656
0
            }
657
1
            out = std::move(localCtx);
658
1
            return Status::OK();
659
1
        }
660
5
        out = std::move(_ctx_pool.back());
661
5
        _ctx_pool.pop_back();
662
5
        return Status::OK();
663
6
    }
664
6
    void _release_compression_ctx(std::unique_ptr<Context> context) {
665
6
        DCHECK(context);
666
6
        LZ4_resetStreamHC_fast(context->ctx, _compression_level);
667
6
        std::lock_guard<std::mutex> l(_ctx_mutex);
668
6
        _ctx_pool.push_back(std::move(context));
669
6
    }
670
671
private:
672
    int64_t _compression_level = config::LZ4_HC_compression_level;
673
    mutable std::mutex _ctx_mutex;
674
    mutable std::vector<std::unique_ptr<Context>> _ctx_pool;
675
};
676
677
class SnappySlicesSource : public snappy::Source {
678
public:
679
    SnappySlicesSource(const std::vector<Slice>& slices)
680
1
            : _available(0), _cur_slice(0), _slice_off(0) {
681
5
        for (auto& slice : slices) {
682
            // We filter empty slice here to avoid complicated process
683
5
            if (slice.size == 0) {
684
1
                continue;
685
1
            }
686
4
            _available += slice.size;
687
4
            _slices.push_back(slice);
688
4
        }
689
1
    }
690
1
    ~SnappySlicesSource() override {}
691
692
    // Return the number of bytes left to read from the source
693
1
    size_t Available() const override { return _available; }
694
695
    // Peek at the next flat region of the source.  Does not reposition
696
    // the source.  The returned region is empty iff Available()==0.
697
    //
698
    // Returns a pointer to the beginning of the region and store its
699
    // length in *len.
700
    //
701
    // The returned region is valid until the next call to Skip() or
702
    // until this object is destroyed, whichever occurs first.
703
    //
704
    // The returned region may be larger than Available() (for example
705
    // if this ByteSource is a view on a substring of a larger source).
706
    // The caller is responsible for ensuring that it only reads the
707
    // Available() bytes.
708
19
    const char* Peek(size_t* len) override {
709
19
        if (_available == 0) {
710
0
            *len = 0;
711
0
            return nullptr;
712
0
        }
713
        // we should assure that *len is not 0
714
19
        *len = _slices[_cur_slice].size - _slice_off;
715
19
        DCHECK(*len != 0);
716
19
        return _slices[_cur_slice].data + _slice_off;
717
19
    }
718
719
    // Skip the next n bytes.  Invalidates any buffer returned by
720
    // a previous call to Peek().
721
    // REQUIRES: Available() >= n
722
20
    void Skip(size_t n) override {
723
20
        _available -= n;
724
24
        while (n > 0) {
725
19
            auto left = _slices[_cur_slice].size - _slice_off;
726
19
            if (left > n) {
727
                // n can be digest in current slice
728
15
                _slice_off += n;
729
15
                return;
730
15
            }
731
4
            _slice_off = 0;
732
4
            _cur_slice++;
733
4
            n -= left;
734
4
        }
735
20
    }
736
737
private:
738
    std::vector<Slice> _slices;
739
    size_t _available;
740
    size_t _cur_slice;
741
    size_t _slice_off;
742
};
743
744
class SnappyBlockCompression : public BlockCompressionCodec {
745
public:
746
86
    static SnappyBlockCompression* instance() {
747
86
        static SnappyBlockCompression s_instance;
748
86
        return &s_instance;
749
86
    }
750
1
    ~SnappyBlockCompression() override = default;
751
752
34
    Status compress(const Slice& input, faststring* output) override {
753
34
        size_t max_len = max_compressed_len(input.size);
754
34
        output->resize(max_len);
755
34
        Slice s(*output);
756
757
34
        snappy::RawCompress(input.data, input.size, s.data, &s.size);
758
34
        output->resize(s.size);
759
34
        return Status::OK();
760
34
    }
761
762
76
    Status decompress(const Slice& input, Slice* output) override {
763
76
        if (!snappy::RawUncompress(input.data, input.size, output->data)) {
764
0
            return Status::InvalidArgument("Fail to do Snappy decompress");
765
0
        }
766
        // NOTE: GetUncompressedLength only takes O(1) time
767
76
        snappy::GetUncompressedLength(input.data, input.size, &output->size);
768
76
        return Status::OK();
769
76
    }
770
771
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
772
1
                    faststring* output) override {
773
1
        auto max_len = max_compressed_len(uncompressed_size);
774
1
        output->resize(max_len);
775
776
1
        SnappySlicesSource source(inputs);
777
1
        snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(output->data()));
778
1
        output->resize(snappy::Compress(&source, &sink));
779
1
        return Status::OK();
780
1
    }
781
782
35
    size_t max_compressed_len(size_t len) override { return snappy::MaxCompressedLength(len); }
783
};
784
785
class HadoopSnappyBlockCompression : public SnappyBlockCompression {
786
public:
787
0
    static HadoopSnappyBlockCompression* instance() {
788
0
        static HadoopSnappyBlockCompression s_instance;
789
0
        return &s_instance;
790
0
    }
791
0
    ~HadoopSnappyBlockCompression() override = default;
792
793
    // hadoop use block compression for snappy
794
    // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
795
0
    Status compress(const Slice& input, faststring* output) override {
796
        // be same with hadop https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java
797
0
        size_t snappy_block_size = config::snappy_compression_block_size;
798
0
        size_t overhead = snappy_block_size / 6 + 32;
799
0
        size_t max_input_size = snappy_block_size - overhead;
800
801
0
        size_t data_len = input.size;
802
0
        char* data = input.data;
803
0
        std::vector<OwnedSlice> buffers;
804
0
        size_t out_len = 0;
805
806
0
        while (data_len > 0) {
807
0
            size_t input_size = std::min(data_len, max_input_size);
808
0
            Slice input_slice(data, input_size);
809
0
            faststring output_data;
810
0
            RETURN_IF_ERROR(SnappyBlockCompression::compress(input_slice, &output_data));
811
0
            out_len += output_data.size();
812
            // the OwnedSlice will be moved here
813
0
            buffers.push_back(output_data.build());
814
0
            data += input_size;
815
0
            data_len -= input_size;
816
0
        }
817
818
        // hadoop block compression: umcompressed_length | compressed_length1 | compressed_data1 | compressed_length2 | compressed_data2 | ...
819
0
        size_t total_output_len = 4 + 4 * buffers.size() + out_len;
820
0
        output->resize(total_output_len);
821
0
        char* output_buffer = (char*)output->data();
822
0
        BigEndian::Store32(output_buffer, input.get_size());
823
0
        output_buffer += 4;
824
0
        for (const auto& buffer : buffers) {
825
0
            auto slice = buffer.slice();
826
0
            BigEndian::Store32(output_buffer, slice.get_size());
827
0
            output_buffer += 4;
828
0
            memcpy(output_buffer, slice.get_data(), slice.get_size());
829
0
            output_buffer += slice.get_size();
830
0
        }
831
832
0
        DCHECK_EQ(output_buffer - (char*)output->data(), total_output_len);
833
834
0
        return Status::OK();
835
0
    }
836
837
0
    Status decompress(const Slice& input, Slice* output) override {
838
0
        return Status::InternalError("unimplement: SnappyHadoopBlockCompression::decompress");
839
0
    }
840
};
841
842
class ZlibBlockCompression : public BlockCompressionCodec {
843
public:
844
2
    static ZlibBlockCompression* instance() {
845
2
        static ZlibBlockCompression s_instance;
846
2
        return &s_instance;
847
2
    }
848
1
    ~ZlibBlockCompression() override = default;
849
850
5
    Status compress(const Slice& input, faststring* output) override {
851
5
        size_t max_len = max_compressed_len(input.size);
852
5
        output->resize(max_len);
853
5
        Slice s(*output);
854
855
5
        auto zres = ::compress((Bytef*)s.data, &s.size, (Bytef*)input.data, input.size);
856
5
        if (zres != Z_OK) {
857
0
            return Status::InvalidArgument("Fail to do ZLib compress, error={}", zError(zres));
858
0
        }
859
5
        output->resize(s.size);
860
5
        return Status::OK();
861
5
    }
862
863
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
864
1
                    faststring* output) override {
865
1
        size_t max_len = max_compressed_len(uncompressed_size);
866
1
        output->resize(max_len);
867
868
1
        z_stream zstrm;
869
1
        zstrm.zalloc = Z_NULL;
870
1
        zstrm.zfree = Z_NULL;
871
1
        zstrm.opaque = Z_NULL;
872
1
        auto zres = deflateInit(&zstrm, Z_DEFAULT_COMPRESSION);
873
1
        if (zres != Z_OK) {
874
0
            return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}",
875
0
                                           zError(zres), zres);
876
0
        }
877
        // we assume that output is e
878
1
        zstrm.next_out = (Bytef*)output->data();
879
1
        zstrm.avail_out = output->size();
880
6
        for (int i = 0; i < inputs.size(); ++i) {
881
5
            if (inputs[i].size == 0) {
882
1
                continue;
883
1
            }
884
4
            zstrm.next_in = (Bytef*)inputs[i].data;
885
4
            zstrm.avail_in = inputs[i].size;
886
4
            int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH;
887
888
4
            zres = deflate(&zstrm, flush);
889
4
            if (zres != Z_OK && zres != Z_STREAM_END) {
890
0
                return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}",
891
0
                                               zError(zres), zres);
892
0
            }
893
4
        }
894
895
1
        output->resize(zstrm.total_out);
896
1
        zres = deflateEnd(&zstrm);
897
1
        if (zres != Z_OK) {
898
0
            return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}",
899
0
                                           zError(zres), zres);
900
0
        }
901
1
        return Status::OK();
902
1
    }
903
904
14
    Status decompress(const Slice& input, Slice* output) override {
905
14
        size_t input_size = input.size;
906
14
        auto zres =
907
14
                ::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size);
908
14
        if (zres != Z_OK) {
909
8
            return Status::InvalidArgument("Fail to do ZLib decompress, error={}", zError(zres));
910
8
        }
911
6
        return Status::OK();
912
14
    }
913
914
6
    size_t max_compressed_len(size_t len) override {
915
        // one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block
916
6
        return len + 6 + 5 * ((len >> 14) + 1);
917
6
    }
918
};
919
920
class Bzip2BlockCompression : public BlockCompressionCodec {
921
public:
922
0
    static Bzip2BlockCompression* instance() {
923
0
        static Bzip2BlockCompression s_instance;
924
0
        return &s_instance;
925
0
    }
926
0
    ~Bzip2BlockCompression() override = default;
927
928
0
    Status compress(const Slice& input, faststring* output) override {
929
0
        size_t max_len = max_compressed_len(input.size);
930
0
        output->resize(max_len);
931
0
        uint32_t size = output->size();
932
0
        auto bzres = BZ2_bzBuffToBuffCompress((char*)output->data(), &size, (char*)input.data,
933
0
                                              input.size, 9, 0, 0);
934
0
        if (bzres != BZ_OK) {
935
0
            return Status::InternalError("Fail to do Bzip2 compress, ret={}", bzres);
936
0
        }
937
0
        output->resize(size);
938
0
        return Status::OK();
939
0
    }
940
941
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
942
0
                    faststring* output) override {
943
0
        size_t max_len = max_compressed_len(uncompressed_size);
944
0
        output->resize(max_len);
945
946
0
        bz_stream bzstrm;
947
0
        bzero(&bzstrm, sizeof(bzstrm));
948
0
        int bzres = BZ2_bzCompressInit(&bzstrm, 9, 0, 0);
949
0
        if (bzres != BZ_OK) {
950
0
            return Status::InternalError("Failed to init bz2. status code: {}", bzres);
951
0
        }
952
        // we assume that output is e
953
0
        bzstrm.next_out = (char*)output->data();
954
0
        bzstrm.avail_out = output->size();
955
0
        for (int i = 0; i < inputs.size(); ++i) {
956
0
            if (inputs[i].size == 0) {
957
0
                continue;
958
0
            }
959
0
            bzstrm.next_in = (char*)inputs[i].data;
960
0
            bzstrm.avail_in = inputs[i].size;
961
0
            int flush = (i == (inputs.size() - 1)) ? BZ_FINISH : BZ_RUN;
962
963
0
            bzres = BZ2_bzCompress(&bzstrm, flush);
964
0
            if (bzres != BZ_OK && bzres != BZ_STREAM_END) {
965
0
                return Status::InternalError("Fail to do bzip2 stream compress, res={}", bzres);
966
0
            }
967
0
        }
968
969
0
        size_t total_out = (size_t)bzstrm.total_out_hi32 << 32 | (size_t)bzstrm.total_out_lo32;
970
0
        output->resize(total_out);
971
0
        bzres = BZ2_bzCompressEnd(&bzstrm);
972
0
        if (bzres != BZ_OK) {
973
0
            return Status::InternalError("Fail to do deflateEnd on bzip2 stream, res={}", bzres);
974
0
        }
975
0
        return Status::OK();
976
0
    }
977
978
0
    Status decompress(const Slice& input, Slice* output) override {
979
0
        return Status::InternalError("unimplement: Bzip2BlockCompression::decompress");
980
0
    }
981
982
0
    size_t max_compressed_len(size_t len) override {
983
        // TODO: make sure the max_compressed_len for bzip2
984
0
        return len * 2;
985
0
    }
986
};
987
988
// for ZSTD compression and decompression, with BOTH fast and high compression ratio
989
class ZstdBlockCompression : public BlockCompressionCodec {
990
private:
991
    class CContext {
992
        ENABLE_FACTORY_CREATOR(CContext);
993
994
    public:
995
1
        CContext() : ctx(nullptr) {
996
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
997
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
998
1
            buffer = std::make_unique<faststring>();
999
1
        }
1000
        ZSTD_CCtx* ctx;
1001
        std::unique_ptr<faststring> buffer;
1002
1
        ~CContext() {
1003
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
1004
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
1005
1
            if (ctx) {
1006
1
                ZSTD_freeCCtx(ctx);
1007
1
            }
1008
1
            buffer.reset();
1009
1
        }
1010
    };
1011
    class DContext {
1012
        ENABLE_FACTORY_CREATOR(DContext);
1013
1014
    public:
1015
6
        DContext() : ctx(nullptr) {}
1016
        ZSTD_DCtx* ctx;
1017
6
        ~DContext() {
1018
6
            if (ctx) {
1019
6
                ZSTD_freeDCtx(ctx);
1020
6
            }
1021
6
        }
1022
    };
1023
1024
public:
1025
169
    static ZstdBlockCompression* instance() {
1026
169
        static ZstdBlockCompression s_instance;
1027
169
        return &s_instance;
1028
169
    }
1029
1
    ~ZstdBlockCompression() {
1030
1
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
1031
1
                ExecEnv::GetInstance()->block_compression_mem_tracker());
1032
1
        _ctx_c_pool.clear();
1033
1
        _ctx_d_pool.clear();
1034
1
    }
1035
1036
141
    size_t max_compressed_len(size_t len) override { return ZSTD_compressBound(len); }
1037
1038
7
    Status compress(const Slice& input, faststring* output) override {
1039
7
        std::vector<Slice> inputs {input};
1040
7
        return compress(inputs, input.size, output);
1041
7
    }
1042
1043
    // follow ZSTD official example
1044
    //  https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c
1045
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
1046
141
                    faststring* output) override {
1047
141
        std::unique_ptr<CContext> context;
1048
141
        RETURN_IF_ERROR(_acquire_compression_ctx(context));
1049
141
        bool compress_failed = false;
1050
141
        Defer defer {[&] {
1051
141
            if (!compress_failed) {
1052
141
                _release_compression_ctx(std::move(context));
1053
141
            }
1054
141
        }};
1055
1056
141
        try {
1057
141
            size_t max_len = max_compressed_len(uncompressed_size);
1058
141
            Slice compressed_buf;
1059
141
            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
1060
                // use output directly
1061
0
                output->resize(max_len);
1062
0
                compressed_buf.data = reinterpret_cast<char*>(output->data());
1063
0
                compressed_buf.size = max_len;
1064
141
            } else {
1065
141
                {
1066
141
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
1067
141
                            ExecEnv::GetInstance()->block_compression_mem_tracker());
1068
                    // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
1069
141
                    context->buffer->resize(max_len);
1070
141
                }
1071
141
                compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
1072
141
                compressed_buf.size = max_len;
1073
141
            }
1074
1075
            // set compression level to default 3
1076
141
            auto ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_compressionLevel,
1077
141
                                              ZSTD_CLEVEL_DEFAULT);
1078
141
            if (ZSTD_isError(ret)) {
1079
0
                return Status::InvalidArgument("ZSTD_CCtx_setParameter compression level error: {}",
1080
0
                                               ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
1081
0
            }
1082
            // set checksum flag to 1
1083
141
            ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1);
1084
141
            if (ZSTD_isError(ret)) {
1085
0
                return Status::InvalidArgument("ZSTD_CCtx_setParameter checksumFlag error: {}",
1086
0
                                               ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
1087
0
            }
1088
1089
141
            ZSTD_outBuffer out_buf = {compressed_buf.data, compressed_buf.size, 0};
1090
1091
286
            for (size_t i = 0; i < inputs.size(); i++) {
1092
145
                ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0};
1093
1094
145
                bool last_input = (i == inputs.size() - 1);
1095
145
                auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue;
1096
1097
145
                bool finished = false;
1098
145
                do {
1099
                    // do compress
1100
145
                    auto ret = ZSTD_compressStream2(context->ctx, &out_buf, &in_buf, mode);
1101
1102
145
                    if (ZSTD_isError(ret)) {
1103
0
                        compress_failed = true;
1104
0
                        return Status::InvalidArgument("ZSTD_compressStream2 error: {}",
1105
0
                                                       ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
1106
0
                    }
1107
1108
                    // ret is ZSTD hint for needed output buffer size
1109
145
                    if (ret > 0 && out_buf.pos == out_buf.size) {
1110
0
                        compress_failed = true;
1111
0
                        return Status::InvalidArgument("ZSTD_compressStream2 output buffer full");
1112
0
                    }
1113
1114
145
                    finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size);
1115
145
                } while (!finished);
1116
145
            }
1117
1118
            // set compressed size for caller
1119
141
            output->resize(out_buf.pos);
1120
141
            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
1121
141
                output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), out_buf.pos);
1122
141
            }
1123
141
        } catch (std::exception& e) {
1124
0
            return Status::InternalError("Fail to do ZSTD compress due to exception {}", e.what());
1125
0
        } catch (...) {
1126
            // Do not set compress_failed to release context
1127
0
            DCHECK(!compress_failed);
1128
0
            return Status::InternalError("Fail to do ZSTD compress due to exception");
1129
0
        }
1130
1131
141
        return Status::OK();
1132
141
    }
1133
1134
132
    Status decompress(const Slice& input, Slice* output) override {
1135
132
        std::unique_ptr<DContext> context;
1136
132
        bool decompress_failed = false;
1137
132
        RETURN_IF_ERROR(_acquire_decompression_ctx(context));
1138
132
        Defer defer {[&] {
1139
132
            if (!decompress_failed) {
1140
127
                _release_decompression_ctx(std::move(context));
1141
127
            }
1142
132
        }};
1143
1144
132
        size_t ret = ZSTD_decompressDCtx(context->ctx, output->data, output->size, input.data,
1145
132
                                         input.size);
1146
132
        if (ZSTD_isError(ret)) {
1147
5
            decompress_failed = true;
1148
5
            return Status::InvalidArgument("ZSTD_decompressDCtx error: {}",
1149
5
                                           ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
1150
5
        }
1151
1152
        // set decompressed size for caller
1153
127
        output->size = ret;
1154
1155
127
        return Status::OK();
1156
132
    }
1157
1158
private:
1159
141
    Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) {
1160
141
        std::lock_guard<std::mutex> l(_ctx_c_mutex);
1161
141
        if (_ctx_c_pool.empty()) {
1162
1
            std::unique_ptr<CContext> localCtx = CContext::create_unique();
1163
1
            if (localCtx.get() == nullptr) {
1164
0
                return Status::InvalidArgument("failed to new ZSTD CContext");
1165
0
            }
1166
            //typedef LZ4F_cctx* LZ4F_compressionContext_t;
1167
1
            localCtx->ctx = ZSTD_createCCtx();
1168
1
            if (localCtx->ctx == nullptr) {
1169
0
                return Status::InvalidArgument("Failed to create ZSTD compress ctx");
1170
0
            }
1171
1
            out = std::move(localCtx);
1172
1
            return Status::OK();
1173
1
        }
1174
140
        out = std::move(_ctx_c_pool.back());
1175
140
        _ctx_c_pool.pop_back();
1176
140
        return Status::OK();
1177
141
    }
1178
141
    void _release_compression_ctx(std::unique_ptr<CContext> context) {
1179
141
        DCHECK(context);
1180
141
        auto ret = ZSTD_CCtx_reset(context->ctx, ZSTD_reset_session_only);
1181
141
        DCHECK(!ZSTD_isError(ret));
1182
141
        std::lock_guard<std::mutex> l(_ctx_c_mutex);
1183
141
        _ctx_c_pool.push_back(std::move(context));
1184
141
    }
1185
1186
132
    Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) {
1187
132
        std::lock_guard<std::mutex> l(_ctx_d_mutex);
1188
132
        if (_ctx_d_pool.empty()) {
1189
6
            std::unique_ptr<DContext> localCtx = DContext::create_unique();
1190
6
            if (localCtx.get() == nullptr) {
1191
0
                return Status::InvalidArgument("failed to new ZSTD DContext");
1192
0
            }
1193
6
            localCtx->ctx = ZSTD_createDCtx();
1194
6
            if (localCtx->ctx == nullptr) {
1195
0
                return Status::InvalidArgument("Fail to init ZSTD decompress context");
1196
0
            }
1197
6
            out = std::move(localCtx);
1198
6
            return Status::OK();
1199
6
        }
1200
126
        out = std::move(_ctx_d_pool.back());
1201
126
        _ctx_d_pool.pop_back();
1202
126
        return Status::OK();
1203
132
    }
1204
127
    void _release_decompression_ctx(std::unique_ptr<DContext> context) {
1205
127
        DCHECK(context);
1206
        // reset ctx to start a new decompress session
1207
127
        auto ret = ZSTD_DCtx_reset(context->ctx, ZSTD_reset_session_only);
1208
127
        DCHECK(!ZSTD_isError(ret));
1209
127
        std::lock_guard<std::mutex> l(_ctx_d_mutex);
1210
127
        _ctx_d_pool.push_back(std::move(context));
1211
127
    }
1212
1213
private:
1214
    mutable std::mutex _ctx_c_mutex;
1215
    mutable std::vector<std::unique_ptr<CContext>> _ctx_c_pool;
1216
1217
    mutable std::mutex _ctx_d_mutex;
1218
    mutable std::vector<std::unique_ptr<DContext>> _ctx_d_pool;
1219
};
1220
1221
class GzipBlockCompression : public ZlibBlockCompression {
1222
public:
1223
0
    static GzipBlockCompression* instance() {
1224
0
        static GzipBlockCompression s_instance;
1225
0
        return &s_instance;
1226
0
    }
1227
0
    ~GzipBlockCompression() override = default;
1228
1229
0
    Status compress(const Slice& input, faststring* output) override {
1230
0
        size_t max_len = max_compressed_len(input.size);
1231
0
        output->resize(max_len);
1232
1233
0
        z_stream z_strm = {};
1234
0
        z_strm.zalloc = Z_NULL;
1235
0
        z_strm.zfree = Z_NULL;
1236
0
        z_strm.opaque = Z_NULL;
1237
1238
0
        int zres = deflateInit2(&z_strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC,
1239
0
                                8, Z_DEFAULT_STRATEGY);
1240
1241
0
        if (zres != Z_OK) {
1242
0
            return Status::InvalidArgument("Fail to init zlib compress");
1243
0
        }
1244
1245
0
        z_strm.next_in = (Bytef*)input.get_data();
1246
0
        z_strm.avail_in = input.get_size();
1247
0
        z_strm.next_out = (Bytef*)output->data();
1248
0
        z_strm.avail_out = output->size();
1249
1250
0
        zres = deflate(&z_strm, Z_FINISH);
1251
0
        if (zres != Z_OK && zres != Z_STREAM_END) {
1252
0
            return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}",
1253
0
                                           zError(zres), zres);
1254
0
        }
1255
1256
0
        output->resize(z_strm.total_out);
1257
0
        zres = deflateEnd(&z_strm);
1258
0
        if (zres != Z_OK) {
1259
0
            return Status::InvalidArgument("Fail to end zlib compress");
1260
0
        }
1261
0
        return Status::OK();
1262
0
    }
1263
1264
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
1265
0
                    faststring* output) override {
1266
0
        size_t max_len = max_compressed_len(uncompressed_size);
1267
0
        output->resize(max_len);
1268
1269
0
        z_stream zstrm;
1270
0
        zstrm.zalloc = Z_NULL;
1271
0
        zstrm.zfree = Z_NULL;
1272
0
        zstrm.opaque = Z_NULL;
1273
0
        auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC,
1274
0
                                 8, Z_DEFAULT_STRATEGY);
1275
0
        if (zres != Z_OK) {
1276
0
            return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}",
1277
0
                                           zError(zres), zres);
1278
0
        }
1279
        // we assume that output is e
1280
0
        zstrm.next_out = (Bytef*)output->data();
1281
0
        zstrm.avail_out = output->size();
1282
0
        for (int i = 0; i < inputs.size(); ++i) {
1283
0
            if (inputs[i].size == 0) {
1284
0
                continue;
1285
0
            }
1286
0
            zstrm.next_in = (Bytef*)inputs[i].data;
1287
0
            zstrm.avail_in = inputs[i].size;
1288
0
            int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH;
1289
1290
0
            zres = deflate(&zstrm, flush);
1291
0
            if (zres != Z_OK && zres != Z_STREAM_END) {
1292
0
                return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}",
1293
0
                                               zError(zres), zres);
1294
0
            }
1295
0
        }
1296
1297
0
        output->resize(zstrm.total_out);
1298
0
        zres = deflateEnd(&zstrm);
1299
0
        if (zres != Z_OK) {
1300
0
            return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}",
1301
0
                                           zError(zres), zres);
1302
0
        }
1303
0
        return Status::OK();
1304
0
    }
1305
1306
0
    Status decompress(const Slice& input, Slice* output) override {
1307
0
        z_stream z_strm = {};
1308
0
        z_strm.zalloc = Z_NULL;
1309
0
        z_strm.zfree = Z_NULL;
1310
0
        z_strm.opaque = Z_NULL;
1311
1312
0
        int ret = inflateInit2(&z_strm, MAX_WBITS + GZIP_CODEC);
1313
0
        if (ret != Z_OK) {
1314
0
            return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
1315
0
                                         zError(ret), ret);
1316
0
        }
1317
1318
        // 1. set input and output
1319
0
        z_strm.next_in = reinterpret_cast<Bytef*>(input.data);
1320
0
        z_strm.avail_in = input.size;
1321
0
        z_strm.next_out = reinterpret_cast<Bytef*>(output->data);
1322
0
        z_strm.avail_out = output->size;
1323
1324
0
        if (z_strm.avail_out > 0) {
1325
            // We only support non-streaming use case  for block decompressor
1326
0
            ret = inflate(&z_strm, Z_FINISH);
1327
0
            if (ret != Z_OK && ret != Z_STREAM_END) {
1328
0
                (void)inflateEnd(&z_strm);
1329
0
                return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
1330
0
                                             zError(ret), ret);
1331
0
            }
1332
0
        }
1333
0
        (void)inflateEnd(&z_strm);
1334
1335
0
        return Status::OK();
1336
0
    }
1337
1338
0
    size_t max_compressed_len(size_t len) override {
1339
0
        z_stream zstrm;
1340
0
        zstrm.zalloc = Z_NULL;
1341
0
        zstrm.zfree = Z_NULL;
1342
0
        zstrm.opaque = Z_NULL;
1343
0
        auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC,
1344
0
                                 MEM_LEVEL, Z_DEFAULT_STRATEGY);
1345
0
        if (zres != Z_OK) {
1346
            // Fall back to zlib estimate logic for deflate, notice this may
1347
            // cause decompress error
1348
0
            LOG(WARNING) << "Fail to do ZLib stream compress, error=" << zError(zres)
1349
0
                         << ", res=" << zres;
1350
0
            return ZlibBlockCompression::max_compressed_len(len);
1351
0
        } else {
1352
0
            zres = deflateEnd(&zstrm);
1353
0
            if (zres != Z_OK) {
1354
0
                LOG(WARNING) << "Fail to do deflateEnd on ZLib stream, error=" << zError(zres)
1355
0
                             << ", res=" << zres;
1356
0
            }
1357
            // Mark, maintainer of zlib, has stated that 12 needs to be added to
1358
            // result for gzip
1359
            // http://compgroups.net/comp.unix.programmer/gzip-compressing-an-in-memory-string-usi/54854
1360
            // To have a safe upper bound for "wrapper variations", we add 32 to
1361
            // estimate
1362
0
            int upper_bound = deflateBound(&zstrm, len) + 32;
1363
0
            return upper_bound;
1364
0
        }
1365
0
    }
1366
1367
private:
1368
    // Magic number for zlib, see https://zlib.net/manual.html for more details.
1369
    const static int GZIP_CODEC = 16; // gzip
1370
    // The memLevel parameter specifies how much memory should be allocated for
1371
    // the internal compression state.
1372
    const static int MEM_LEVEL = 8;
1373
};
1374
1375
// Only used on x86 or x86_64
1376
#if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \
1377
        defined(__i386) || defined(_M_IX86)
1378
class GzipBlockCompressionByLibdeflate final : public GzipBlockCompression {
1379
public:
1380
0
    GzipBlockCompressionByLibdeflate() : GzipBlockCompression() {}
1381
0
    static GzipBlockCompressionByLibdeflate* instance() {
1382
0
        static GzipBlockCompressionByLibdeflate s_instance;
1383
0
        return &s_instance;
1384
0
    }
1385
0
    ~GzipBlockCompressionByLibdeflate() override = default;
1386
1387
0
    Status decompress(const Slice& input, Slice* output) override {
1388
0
        if (input.empty()) {
1389
0
            output->size = 0;
1390
0
            return Status::OK();
1391
0
        }
1392
0
        thread_local std::unique_ptr<libdeflate_decompressor, void (*)(libdeflate_decompressor*)>
1393
0
                decompressor {libdeflate_alloc_decompressor(), libdeflate_free_decompressor};
1394
0
        if (!decompressor) {
1395
0
            return Status::InternalError("libdeflate_alloc_decompressor error.");
1396
0
        }
1397
0
        std::size_t out_len;
1398
0
        auto result = libdeflate_gzip_decompress(decompressor.get(), input.data, input.size,
1399
0
                                                 output->data, output->size, &out_len);
1400
0
        if (result != LIBDEFLATE_SUCCESS) {
1401
0
            return Status::InternalError("libdeflate_gzip_decompress error, res={}", result);
1402
0
        }
1403
0
        return Status::OK();
1404
0
    }
1405
};
1406
#endif
1407
1408
class LzoBlockCompression final : public BlockCompressionCodec {
1409
public:
1410
0
    static LzoBlockCompression* instance() {
1411
0
        static LzoBlockCompression s_instance;
1412
0
        return &s_instance;
1413
0
    }
1414
1415
0
    Status compress(const Slice& input, faststring* output) override {
1416
0
        return Status::InvalidArgument("not impl lzo compress.");
1417
0
    }
1418
0
    size_t max_compressed_len(size_t len) override { return 0; };
1419
0
    Status decompress(const Slice& input, Slice* output) override {
1420
0
        auto* input_ptr = input.data;
1421
0
        auto remain_input_size = input.size;
1422
0
        auto* output_ptr = output->data;
1423
0
        auto remain_output_size = output->size;
1424
0
        auto* output_limit = output->data + output->size;
1425
1426
        // Example:
1427
        // OriginData(The original data will be divided into several large data block.) :
1428
        //      large data block1 | large data block2 | large data block3 | ....
1429
        // The large data block will be divided into several small data block.
1430
        // Suppose a large data block is divided into three small blocks:
1431
        // large data block1:            | small block1 | small block2 | small block3 |
1432
        // CompressData:   <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]>
1433
        //
1434
        // A : original length of the current block of large data block.
1435
        // sizeof(A) = 4 bytes.
1436
        // A = length(small block1) + length(small block2) + length(small block3)
1437
        // Bx : length of  small data block bx.
1438
        // sizeof(Bx) = 4 bytes.
1439
        // Bx = length(compress(small blockx))
1440
0
        try {
1441
0
            while (remain_input_size > 0) {
1442
0
                if (remain_input_size < 4) {
1443
0
                    return Status::InvalidArgument(
1444
0
                            "Need more input buffer to get large_block_uncompressed_len.");
1445
0
                }
1446
1447
0
                uint32_t large_block_uncompressed_len = BigEndian::Load32(input_ptr);
1448
0
                input_ptr += 4;
1449
0
                remain_input_size -= 4;
1450
1451
0
                if (remain_output_size < large_block_uncompressed_len) {
1452
0
                    return Status::InvalidArgument(
1453
0
                            "Need more output buffer to get uncompressed data.");
1454
0
                }
1455
1456
0
                while (large_block_uncompressed_len > 0) {
1457
0
                    if (remain_input_size < 4) {
1458
0
                        return Status::InvalidArgument(
1459
0
                                "Need more input buffer to get small_block_compressed_len.");
1460
0
                    }
1461
1462
0
                    uint32_t small_block_compressed_len = BigEndian::Load32(input_ptr);
1463
0
                    input_ptr += 4;
1464
0
                    remain_input_size -= 4;
1465
1466
0
                    if (remain_input_size < small_block_compressed_len) {
1467
0
                        return Status::InvalidArgument(
1468
0
                                "Need more input buffer to decompress small block.");
1469
0
                    }
1470
1471
0
                    auto small_block_uncompressed_len =
1472
0
                            orc::lzoDecompress(input_ptr, input_ptr + small_block_compressed_len,
1473
0
                                               output_ptr, output_limit);
1474
1475
0
                    input_ptr += small_block_compressed_len;
1476
0
                    remain_input_size -= small_block_compressed_len;
1477
1478
0
                    output_ptr += small_block_uncompressed_len;
1479
0
                    large_block_uncompressed_len -= small_block_uncompressed_len;
1480
0
                    remain_output_size -= small_block_uncompressed_len;
1481
0
                }
1482
0
            }
1483
0
        } catch (const orc::ParseError& e) {
1484
            //Prevent be from hanging due to orc::lzoDecompress throw exception
1485
0
            return Status::InternalError("Fail to do LZO decompress, error={}", e.what());
1486
0
        }
1487
0
        return Status::OK();
1488
0
    }
1489
};
1490
1491
class BrotliBlockCompression final : public BlockCompressionCodec {
1492
public:
1493
0
    static BrotliBlockCompression* instance() {
1494
0
        static BrotliBlockCompression s_instance;
1495
0
        return &s_instance;
1496
0
    }
1497
1498
0
    Status compress(const Slice& input, faststring* output) override {
1499
0
        return Status::InvalidArgument("not impl brotli compress.");
1500
0
    }
1501
1502
0
    size_t max_compressed_len(size_t len) override { return 0; };
1503
1504
0
    Status decompress(const Slice& input, Slice* output) override {
1505
        // The size of output buffer is always equal to the umcompressed length.
1506
0
        BrotliDecoderResult result = BrotliDecoderDecompress(
1507
0
                input.get_size(), reinterpret_cast<const uint8_t*>(input.get_data()), &output->size,
1508
0
                reinterpret_cast<uint8_t*>(output->data));
1509
0
        if (result != BROTLI_DECODER_RESULT_SUCCESS) {
1510
0
            return Status::InternalError("Brotli decompression failed, result={}", result);
1511
0
        }
1512
0
        return Status::OK();
1513
0
    }
1514
};
1515
1516
Status get_block_compression_codec(segment_v2::CompressionTypePB type,
1517
20.7k
                                   BlockCompressionCodec** codec) {
1518
20.7k
    switch (type) {
1519
106
    case segment_v2::CompressionTypePB::NO_COMPRESSION:
1520
106
        *codec = nullptr;
1521
106
        break;
1522
47
    case segment_v2::CompressionTypePB::SNAPPY:
1523
47
        *codec = SnappyBlockCompression::instance();
1524
47
        break;
1525
59
    case segment_v2::CompressionTypePB::LZ4:
1526
59
        *codec = Lz4BlockCompression::instance();
1527
59
        break;
1528
20.3k
    case segment_v2::CompressionTypePB::LZ4F:
1529
20.3k
        *codec = Lz4fBlockCompression::instance();
1530
20.3k
        break;
1531
2
    case segment_v2::CompressionTypePB::LZ4HC:
1532
2
        *codec = Lz4HCBlockCompression::instance();
1533
2
        break;
1534
2
    case segment_v2::CompressionTypePB::ZLIB:
1535
2
        *codec = ZlibBlockCompression::instance();
1536
2
        break;
1537
169
    case segment_v2::CompressionTypePB::ZSTD:
1538
169
        *codec = ZstdBlockCompression::instance();
1539
169
        break;
1540
0
    default:
1541
0
        return Status::InternalError("unknown compression type({})", type);
1542
20.7k
    }
1543
1544
20.7k
    return Status::OK();
1545
20.7k
}
1546
1547
// this can only be used in hive text write
1548
0
Status get_block_compression_codec(TFileCompressType::type type, BlockCompressionCodec** codec) {
1549
0
    switch (type) {
1550
0
    case TFileCompressType::PLAIN:
1551
0
        *codec = nullptr;
1552
0
        break;
1553
0
    case TFileCompressType::ZLIB:
1554
0
        *codec = ZlibBlockCompression::instance();
1555
0
        break;
1556
0
    case TFileCompressType::GZ:
1557
0
        *codec = GzipBlockCompression::instance();
1558
0
        break;
1559
0
    case TFileCompressType::BZ2:
1560
0
        *codec = Bzip2BlockCompression::instance();
1561
0
        break;
1562
0
    case TFileCompressType::LZ4BLOCK:
1563
0
        *codec = HadoopLz4BlockCompression::instance();
1564
0
        break;
1565
0
    case TFileCompressType::SNAPPYBLOCK:
1566
0
        *codec = HadoopSnappyBlockCompression::instance();
1567
0
        break;
1568
0
    case TFileCompressType::ZSTD:
1569
0
        *codec = ZstdBlockCompression::instance();
1570
0
        break;
1571
0
    default:
1572
0
        return Status::InternalError("unsupport compression type({}) int hive text", type);
1573
0
    }
1574
1575
0
    return Status::OK();
1576
0
}
1577
1578
Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_codec,
1579
39
                                   BlockCompressionCodec** codec) {
1580
39
    switch (parquet_codec) {
1581
0
    case tparquet::CompressionCodec::UNCOMPRESSED:
1582
0
        *codec = nullptr;
1583
0
        break;
1584
39
    case tparquet::CompressionCodec::SNAPPY:
1585
39
        *codec = SnappyBlockCompression::instance();
1586
39
        break;
1587
0
    case tparquet::CompressionCodec::LZ4_RAW: // we can use LZ4 compression algorithm parse LZ4_RAW
1588
0
    case tparquet::CompressionCodec::LZ4:
1589
0
        *codec = HadoopLz4BlockCompression::instance();
1590
0
        break;
1591
0
    case tparquet::CompressionCodec::ZSTD:
1592
0
        *codec = ZstdBlockCompression::instance();
1593
0
        break;
1594
0
    case tparquet::CompressionCodec::GZIP:
1595
// Only used on x86 or x86_64
1596
0
#if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \
1597
0
        defined(__i386) || defined(_M_IX86)
1598
0
        *codec = GzipBlockCompressionByLibdeflate::instance();
1599
#else
1600
        *codec = GzipBlockCompression::instance();
1601
#endif
1602
0
        break;
1603
0
    case tparquet::CompressionCodec::LZO:
1604
0
        *codec = LzoBlockCompression::instance();
1605
0
        break;
1606
0
    case tparquet::CompressionCodec::BROTLI:
1607
0
        *codec = BrotliBlockCompression::instance();
1608
0
        break;
1609
0
    default:
1610
0
        return Status::InternalError("unknown compression type({})", parquet_codec);
1611
39
    }
1612
1613
39
    return Status::OK();
1614
39
}
1615
1616
} // namespace doris