Coverage Report

Created: 2025-03-27 13:57

/root/doris/be/src/util/block_compression.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "util/block_compression.h"
19
20
#include <bzlib.h>
21
#include <gen_cpp/parquet_types.h>
22
#include <gen_cpp/segment_v2.pb.h>
23
#include <glog/logging.h>
24
25
#include <exception>
26
// Only used on x86 or x86_64
27
#if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \
28
        defined(__i386) || defined(_M_IX86)
29
#include <libdeflate.h>
30
#endif
31
#include <brotli/decode.h>
32
#include <glog/log_severity.h>
33
#include <glog/logging.h>
34
#include <lz4/lz4.h>
35
#include <lz4/lz4frame.h>
36
#include <lz4/lz4hc.h>
37
#include <snappy/snappy-sinksource.h>
38
#include <snappy/snappy.h>
39
#include <zconf.h>
40
#include <zlib.h>
41
#include <zstd.h>
42
#include <zstd_errors.h>
43
44
#include <algorithm>
45
#include <cstdint>
46
#include <limits>
47
#include <mutex>
48
#include <orc/Exceptions.hh>
49
#include <ostream>
50
51
#include "common/config.h"
52
#include "common/factory_creator.h"
53
#include "exec/decompressor.h"
54
#include "gutil/endian.h"
55
#include "gutil/strings/substitute.h"
56
#include "runtime/thread_context.h"
57
#include "util/defer_op.h"
58
#include "util/faststring.h"
59
60
namespace orc {
61
/**
62
 * Decompress the bytes in to the output buffer.
63
 * @param inputAddress the start of the input
64
 * @param inputLimit one past the last byte of the input
65
 * @param outputAddress the start of the output buffer
66
 * @param outputLimit one past the last byte of the output buffer
67
 * @result the number of bytes decompressed
68
 */
69
uint64_t lzoDecompress(const char* inputAddress, const char* inputLimit, char* outputAddress,
70
                       char* outputLimit);
71
} // namespace orc
72
73
namespace doris {
74
75
// exception safe
76
Status BlockCompressionCodec::compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
77
2
                                       faststring* output) {
78
2
    faststring buf;
79
    // we compute total size to avoid more memory copy
80
2
    buf.reserve(uncompressed_size);
81
10
    for (auto& input : inputs) {
82
10
        buf.append(input.data, input.size);
83
10
    }
84
2
    return compress(buf, output);
85
2
}
86
87
15.7k
bool BlockCompressionCodec::exceed_max_compress_len(size_t uncompressed_size) {
88
15.7k
    return uncompressed_size > std::numeric_limits<int32_t>::max();
89
15.7k
}
90
91
class Lz4BlockCompression : public BlockCompressionCodec {
92
private:
93
    class Context {
94
        ENABLE_FACTORY_CREATOR(Context);
95
96
    public:
97
1
        Context() : ctx(nullptr) {
98
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
99
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
100
1
            buffer = std::make_unique<faststring>();
101
1
        }
102
        LZ4_stream_t* ctx;
103
        std::unique_ptr<faststring> buffer;
104
1
        ~Context() {
105
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
106
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
107
1
            if (ctx) {
108
1
                LZ4_freeStream(ctx);
109
1
            }
110
1
            buffer.reset();
111
1
        }
112
    };
113
114
public:
115
59
    static Lz4BlockCompression* instance() {
116
59
        static Lz4BlockCompression s_instance;
117
59
        return &s_instance;
118
59
    }
119
1
    ~Lz4BlockCompression() override {
120
1
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
121
1
                ExecEnv::GetInstance()->block_compression_mem_tracker());
122
1
        _ctx_pool.clear();
123
1
    }
124
125
44
    Status compress(const Slice& input, faststring* output) override {
126
44
        if (input.size > LZ4_MAX_INPUT_SIZE) {
127
0
            return Status::InvalidArgument(
128
0
                    "LZ4 not support those case(input.size>LZ4_MAX_INPUT_SIZE), maybe you should "
129
0
                    "change "
130
0
                    "fragment_transmission_compression_codec to snappy, input.size={}, "
131
0
                    "LZ4_MAX_INPUT_SIZE={}",
132
0
                    input.size, LZ4_MAX_INPUT_SIZE);
133
0
        }
134
135
44
        std::unique_ptr<Context> context;
136
44
        RETURN_IF_ERROR(_acquire_compression_ctx(context));
137
44
        bool compress_failed = false;
138
44
        Defer defer {[&] {
139
44
            if (!compress_failed) {
140
44
                _release_compression_ctx(std::move(context));
141
44
            }
142
44
        }};
143
144
44
        try {
145
44
            Slice compressed_buf;
146
44
            size_t max_len = max_compressed_len(input.size);
147
44
            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
148
                // use output directly
149
0
                output->resize(max_len);
150
0
                compressed_buf.data = reinterpret_cast<char*>(output->data());
151
0
                compressed_buf.size = max_len;
152
44
            } else {
153
                // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
154
44
                {
155
                    // context->buffer is resuable between queries, should accouting to
156
                    // global tracker.
157
44
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
158
44
                            ExecEnv::GetInstance()->block_compression_mem_tracker());
159
44
                    context->buffer->resize(max_len);
160
44
                }
161
44
                compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
162
44
                compressed_buf.size = max_len;
163
44
            }
164
165
44
            size_t compressed_len =
166
44
                    LZ4_compress_fast_continue(context->ctx, input.data, compressed_buf.data,
167
44
                                               input.size, compressed_buf.size, ACCELARATION);
168
44
            if (compressed_len == 0) {
169
0
                compress_failed = true;
170
0
                return Status::InvalidArgument("Output buffer's capacity is not enough, size={}",
171
0
                                               compressed_buf.size);
172
0
            }
173
44
            output->resize(compressed_len);
174
44
            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
175
44
                output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data),
176
44
                                    compressed_len);
177
44
            }
178
44
        } catch (...) {
179
            // Do not set compress_failed to release context
180
0
            DCHECK(!compress_failed);
181
0
            return Status::InternalError("Fail to do LZ4Block compress due to exception");
182
0
        }
183
44
        return Status::OK();
184
44
    }
185
186
30
    Status decompress(const Slice& input, Slice* output) override {
187
30
        auto decompressed_len =
188
30
                LZ4_decompress_safe(input.data, output->data, input.size, output->size);
189
30
        if (decompressed_len < 0) {
190
5
            return Status::InternalError("fail to do LZ4 decompress, error={}", decompressed_len);
191
5
        }
192
25
        output->size = decompressed_len;
193
25
        return Status::OK();
194
30
    }
195
196
44
    size_t max_compressed_len(size_t len) override { return LZ4_compressBound(len); }
197
198
private:
199
    // reuse LZ4 compress stream
200
44
    Status _acquire_compression_ctx(std::unique_ptr<Context>& out) {
201
44
        std::lock_guard<std::mutex> l(_ctx_mutex);
202
44
        if (_ctx_pool.empty()) {
203
1
            std::unique_ptr<Context> localCtx = Context::create_unique();
204
1
            if (localCtx.get() == nullptr) {
205
0
                return Status::InvalidArgument("new LZ4 context error");
206
0
            }
207
1
            localCtx->ctx = LZ4_createStream();
208
1
            if (localCtx->ctx == nullptr) {
209
0
                return Status::InvalidArgument("LZ4_createStream error");
210
0
            }
211
1
            out = std::move(localCtx);
212
1
            return Status::OK();
213
1
        }
214
43
        out = std::move(_ctx_pool.back());
215
43
        _ctx_pool.pop_back();
216
43
        return Status::OK();
217
44
    }
218
44
    void _release_compression_ctx(std::unique_ptr<Context> context) {
219
44
        DCHECK(context);
220
44
        LZ4_resetStream(context->ctx);
221
44
        std::lock_guard<std::mutex> l(_ctx_mutex);
222
44
        _ctx_pool.push_back(std::move(context));
223
44
    }
224
225
private:
226
    mutable std::mutex _ctx_mutex;
227
    mutable std::vector<std::unique_ptr<Context>> _ctx_pool;
228
    static const int32_t ACCELARATION = 1;
229
};
230
231
class HadoopLz4BlockCompression : public Lz4BlockCompression {
232
public:
233
0
    HadoopLz4BlockCompression() {
234
0
        Status st = Decompressor::create_decompressor(CompressType::LZ4BLOCK, &_decompressor);
235
0
        if (!st.ok()) {
236
0
            throw Exception(Status::FatalError(
237
0
                    "HadoopLz4BlockCompression construction failed. status = {}", st));
238
0
        }
239
0
    }
240
241
0
    ~HadoopLz4BlockCompression() override = default;
242
243
0
    static HadoopLz4BlockCompression* instance() {
244
0
        static HadoopLz4BlockCompression s_instance;
245
0
        return &s_instance;
246
0
    }
247
248
    // hadoop use block compression for lz4
249
    // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
250
0
    Status compress(const Slice& input, faststring* output) override {
251
        // be same with hadop https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java
252
0
        size_t lz4_block_size = config::lz4_compression_block_size;
253
0
        size_t overhead = lz4_block_size / 255 + 16;
254
0
        size_t max_input_size = lz4_block_size - overhead;
255
256
0
        size_t data_len = input.size;
257
0
        char* data = input.data;
258
0
        std::vector<OwnedSlice> buffers;
259
0
        size_t out_len = 0;
260
261
0
        while (data_len > 0) {
262
0
            size_t input_size = std::min(data_len, max_input_size);
263
0
            Slice input_slice(data, input_size);
264
0
            faststring output_data;
265
0
            RETURN_IF_ERROR(Lz4BlockCompression::compress(input_slice, &output_data));
266
0
            out_len += output_data.size();
267
0
            buffers.push_back(output_data.build());
268
0
            data += input_size;
269
0
            data_len -= input_size;
270
0
        }
271
272
        // hadoop block compression: umcompressed_length | compressed_length1 | compressed_data1 | compressed_length2 | compressed_data2 | ...
273
0
        size_t total_output_len = 4 + 4 * buffers.size() + out_len;
274
0
        output->resize(total_output_len);
275
0
        char* output_buffer = (char*)output->data();
276
0
        BigEndian::Store32(output_buffer, input.get_size());
277
0
        output_buffer += 4;
278
0
        for (const auto& buffer : buffers) {
279
0
            auto slice = buffer.slice();
280
0
            BigEndian::Store32(output_buffer, slice.get_size());
281
0
            output_buffer += 4;
282
0
            memcpy(output_buffer, slice.get_data(), slice.get_size());
283
0
            output_buffer += slice.get_size();
284
0
        }
285
286
0
        DCHECK_EQ(output_buffer - (char*)output->data(), total_output_len);
287
288
0
        return Status::OK();
289
0
    }
290
291
0
    Status decompress(const Slice& input, Slice* output) override {
292
0
        size_t input_bytes_read = 0;
293
0
        size_t decompressed_len = 0;
294
0
        size_t more_input_bytes = 0;
295
0
        size_t more_output_bytes = 0;
296
0
        bool stream_end = false;
297
0
        auto st = _decompressor->decompress((uint8_t*)input.data, input.size, &input_bytes_read,
298
0
                                            (uint8_t*)output->data, output->size, &decompressed_len,
299
0
                                            &stream_end, &more_input_bytes, &more_output_bytes);
300
        //try decompress use hadoopLz4 ,if failed fall back lz4.
301
0
        return (st != Status::OK() || stream_end != true)
302
0
                       ? Lz4BlockCompression::decompress(input, output)
303
0
                       : Status::OK();
304
0
    }
305
306
private:
307
    std::unique_ptr<Decompressor> _decompressor;
308
};
309
// Used for LZ4 frame format, decompress speed is two times faster than LZ4.
310
class Lz4fBlockCompression : public BlockCompressionCodec {
311
private:
312
    class CContext {
313
        ENABLE_FACTORY_CREATOR(CContext);
314
315
    public:
316
1
        CContext() : ctx(nullptr) {
317
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
318
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
319
1
            buffer = std::make_unique<faststring>();
320
1
        }
321
        LZ4F_compressionContext_t ctx;
322
        std::unique_ptr<faststring> buffer;
323
1
        ~CContext() {
324
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
325
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
326
1
            if (ctx) {
327
1
                LZ4F_freeCompressionContext(ctx);
328
1
            }
329
1
            buffer.reset();
330
1
        }
331
    };
332
    class DContext {
333
        ENABLE_FACTORY_CREATOR(DContext);
334
335
    public:
336
6
        DContext() : ctx(nullptr) {}
337
        LZ4F_decompressionContext_t ctx;
338
6
        ~DContext() {
339
6
            if (ctx) {
340
6
                LZ4F_freeDecompressionContext(ctx);
341
6
            }
342
6
        }
343
    };
344
345
public:
346
25.0k
    static Lz4fBlockCompression* instance() {
347
25.0k
        static Lz4fBlockCompression s_instance;
348
25.0k
        return &s_instance;
349
25.0k
    }
350
1
    ~Lz4fBlockCompression() {
351
1
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
352
1
                ExecEnv::GetInstance()->block_compression_mem_tracker());
353
1
        _ctx_c_pool.clear();
354
1
        _ctx_d_pool.clear();
355
1
    }
356
357
5
    Status compress(const Slice& input, faststring* output) override {
358
5
        std::vector<Slice> inputs {input};
359
5
        return compress(inputs, input.size, output);
360
5
    }
361
362
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
363
15.6k
                    faststring* output) override {
364
15.6k
        return _compress(inputs, uncompressed_size, output);
365
15.6k
    }
366
367
6.75k
    Status decompress(const Slice& input, Slice* output) override {
368
6.75k
        return _decompress(input, output);
369
6.75k
    }
370
371
15.6k
    size_t max_compressed_len(size_t len) override {
372
15.6k
        return std::max(LZ4F_compressBound(len, &_s_preferences),
373
15.6k
                        LZ4F_compressFrameBound(len, &_s_preferences));
374
15.6k
    }
375
376
private:
377
    Status _compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
378
15.6k
                     faststring* output) {
379
15.6k
        std::unique_ptr<CContext> context;
380
15.6k
        RETURN_IF_ERROR(_acquire_compression_ctx(context));
381
15.6k
        bool compress_failed = false;
382
15.6k
        Defer defer {[&] {
383
15.6k
            if (!compress_failed) {
384
15.6k
                _release_compression_ctx(std::move(context));
385
15.6k
            }
386
15.6k
        }};
387
388
15.6k
        try {
389
15.6k
            Slice compressed_buf;
390
15.6k
            size_t max_len = max_compressed_len(uncompressed_size);
391
15.6k
            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
392
                // use output directly
393
0
                output->resize(max_len);
394
0
                compressed_buf.data = reinterpret_cast<char*>(output->data());
395
0
                compressed_buf.size = max_len;
396
15.6k
            } else {
397
15.6k
                {
398
15.6k
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
399
15.6k
                            ExecEnv::GetInstance()->block_compression_mem_tracker());
400
                    // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
401
15.6k
                    context->buffer->resize(max_len);
402
15.6k
                }
403
15.6k
                compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
404
15.6k
                compressed_buf.size = max_len;
405
15.6k
            }
406
407
15.6k
            auto wbytes = LZ4F_compressBegin(context->ctx, compressed_buf.data, compressed_buf.size,
408
15.6k
                                             &_s_preferences);
409
15.6k
            if (LZ4F_isError(wbytes)) {
410
0
                compress_failed = true;
411
0
                return Status::InvalidArgument("Fail to do LZ4F compress begin, res={}",
412
0
                                               LZ4F_getErrorName(wbytes));
413
0
            }
414
15.6k
            size_t offset = wbytes;
415
15.6k
            for (auto input : inputs) {
416
15.6k
                wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data + offset,
417
15.6k
                                             compressed_buf.size - offset, input.data, input.size,
418
15.6k
                                             nullptr);
419
15.6k
                if (LZ4F_isError(wbytes)) {
420
0
                    compress_failed = true;
421
0
                    return Status::InvalidArgument("Fail to do LZ4F compress update, res={}",
422
0
                                                   LZ4F_getErrorName(wbytes));
423
0
                }
424
15.6k
                offset += wbytes;
425
15.6k
            }
426
15.6k
            wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + offset,
427
15.6k
                                      compressed_buf.size - offset, nullptr);
428
15.6k
            if (LZ4F_isError(wbytes)) {
429
0
                compress_failed = true;
430
0
                return Status::InvalidArgument("Fail to do LZ4F compress end, res={}",
431
0
                                               LZ4F_getErrorName(wbytes));
432
0
            }
433
15.6k
            offset += wbytes;
434
15.6k
            output->resize(offset);
435
15.6k
            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
436
15.6k
                output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), offset);
437
15.6k
            }
438
15.6k
        } catch (...) {
439
            // Do not set compress_failed to release context
440
0
            DCHECK(!compress_failed);
441
0
            return Status::InternalError("Fail to do LZ4F compress due to exception");
442
0
        }
443
444
15.6k
        return Status::OK();
445
15.6k
    }
446
447
6.75k
    Status _decompress(const Slice& input, Slice* output) {
448
6.75k
        bool decompress_failed = false;
449
6.75k
        std::unique_ptr<DContext> context;
450
6.75k
        RETURN_IF_ERROR(_acquire_decompression_ctx(context));
451
6.75k
        Defer defer {[&] {
452
6.75k
            if (!decompress_failed) {
453
6.74k
                _release_decompression_ctx(std::move(context));
454
6.74k
            }
455
6.75k
        }};
456
6.75k
        size_t input_size = input.size;
457
6.75k
        auto lres = LZ4F_decompress(context->ctx, output->data, &output->size, input.data,
458
6.75k
                                    &input_size, nullptr);
459
6.75k
        if (LZ4F_isError(lres)) {
460
0
            decompress_failed = true;
461
0
            return Status::InternalError("Fail to do LZ4F decompress, res={}",
462
0
                                         LZ4F_getErrorName(lres));
463
6.75k
        } else if (input_size != input.size) {
464
0
            decompress_failed = true;
465
0
            return Status::InvalidArgument(
466
0
                    strings::Substitute("Fail to do LZ4F decompress: trailing data left in "
467
0
                                        "compressed data, read=$0 vs given=$1",
468
0
                                        input_size, input.size));
469
6.75k
        } else if (lres != 0) {
470
5
            decompress_failed = true;
471
5
            return Status::InvalidArgument(
472
5
                    "Fail to do LZ4F decompress: expect more compressed data, expect={}", lres);
473
5
        }
474
6.74k
        return Status::OK();
475
6.75k
    }
476
477
private:
478
    // acquire a compression ctx from pool, release while finish compress,
479
    // delete if compression failed
480
15.6k
    Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) {
481
15.6k
        std::lock_guard<std::mutex> l(_ctx_c_mutex);
482
15.6k
        if (_ctx_c_pool.empty()) {
483
1
            std::unique_ptr<CContext> localCtx = CContext::create_unique();
484
1
            if (localCtx.get() == nullptr) {
485
0
                return Status::InvalidArgument("failed to new LZ4F CContext");
486
0
            }
487
1
            auto res = LZ4F_createCompressionContext(&localCtx->ctx, LZ4F_VERSION);
488
1
            if (LZ4F_isError(res) != 0) {
489
0
                return Status::InvalidArgument(strings::Substitute(
490
0
                        "LZ4F_createCompressionContext error, res=$0", LZ4F_getErrorName(res)));
491
0
            }
492
1
            out = std::move(localCtx);
493
1
            return Status::OK();
494
1
        }
495
15.6k
        out = std::move(_ctx_c_pool.back());
496
15.6k
        _ctx_c_pool.pop_back();
497
15.6k
        return Status::OK();
498
15.6k
    }
499
15.6k
    void _release_compression_ctx(std::unique_ptr<CContext> context) {
500
15.6k
        DCHECK(context);
501
15.6k
        std::lock_guard<std::mutex> l(_ctx_c_mutex);
502
15.6k
        _ctx_c_pool.push_back(std::move(context));
503
15.6k
    }
504
505
6.75k
    Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) {
506
6.75k
        std::lock_guard<std::mutex> l(_ctx_d_mutex);
507
6.75k
        if (_ctx_d_pool.empty()) {
508
6
            std::unique_ptr<DContext> localCtx = DContext::create_unique();
509
6
            if (localCtx.get() == nullptr) {
510
0
                return Status::InvalidArgument("failed to new LZ4F DContext");
511
0
            }
512
6
            auto res = LZ4F_createDecompressionContext(&localCtx->ctx, LZ4F_VERSION);
513
6
            if (LZ4F_isError(res) != 0) {
514
0
                return Status::InvalidArgument(strings::Substitute(
515
0
                        "LZ4F_createDeompressionContext error, res=$0", LZ4F_getErrorName(res)));
516
0
            }
517
6
            out = std::move(localCtx);
518
6
            return Status::OK();
519
6
        }
520
6.74k
        out = std::move(_ctx_d_pool.back());
521
6.74k
        _ctx_d_pool.pop_back();
522
6.74k
        return Status::OK();
523
6.75k
    }
524
6.74k
    void _release_decompression_ctx(std::unique_ptr<DContext> context) {
525
6.74k
        DCHECK(context);
526
        // reset decompression context to avoid ERROR_maxBlockSize_invalid
527
6.74k
        LZ4F_resetDecompressionContext(context->ctx);
528
6.74k
        std::lock_guard<std::mutex> l(_ctx_d_mutex);
529
6.74k
        _ctx_d_pool.push_back(std::move(context));
530
6.74k
    }
531
532
private:
533
    static LZ4F_preferences_t _s_preferences;
534
535
    std::mutex _ctx_c_mutex;
536
    // LZ4F_compressionContext_t is a pointer so no copy here
537
    std::vector<std::unique_ptr<CContext>> _ctx_c_pool;
538
539
    std::mutex _ctx_d_mutex;
540
    std::vector<std::unique_ptr<DContext>> _ctx_d_pool;
541
};
542
543
LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = {
544
        {LZ4F_max256KB, LZ4F_blockLinked, LZ4F_noContentChecksum, LZ4F_frame, 0ULL, 0U,
545
         LZ4F_noBlockChecksum},
546
        0,
547
        0u,
548
        0u,
549
        {0u, 0u, 0u}};
550
551
class Lz4HCBlockCompression : public BlockCompressionCodec {
552
private:
553
    class Context {
554
        ENABLE_FACTORY_CREATOR(Context);
555
556
    public:
557
1
        Context() : ctx(nullptr) {
558
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
559
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
560
1
            buffer = std::make_unique<faststring>();
561
1
        }
562
        LZ4_streamHC_t* ctx;
563
        std::unique_ptr<faststring> buffer;
564
1
        ~Context() {
565
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
566
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
567
1
            if (ctx) {
568
1
                LZ4_freeStreamHC(ctx);
569
1
            }
570
1
            buffer.reset();
571
1
        }
572
    };
573
574
public:
575
2
    static Lz4HCBlockCompression* instance() {
576
2
        static Lz4HCBlockCompression s_instance;
577
2
        return &s_instance;
578
2
    }
579
1
    ~Lz4HCBlockCompression() {
580
1
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
581
1
                ExecEnv::GetInstance()->block_compression_mem_tracker());
582
1
        _ctx_pool.clear();
583
1
    }
584
585
6
    Status compress(const Slice& input, faststring* output) override {
586
6
        std::unique_ptr<Context> context;
587
6
        RETURN_IF_ERROR(_acquire_compression_ctx(context));
588
6
        bool compress_failed = false;
589
6
        Defer defer {[&] {
590
6
            if (!compress_failed) {
591
6
                _release_compression_ctx(std::move(context));
592
6
            }
593
6
        }};
594
595
6
        try {
596
6
            Slice compressed_buf;
597
6
            size_t max_len = max_compressed_len(input.size);
598
6
            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
599
                // use output directly
600
0
                output->resize(max_len);
601
0
                compressed_buf.data = reinterpret_cast<char*>(output->data());
602
0
                compressed_buf.size = max_len;
603
6
            } else {
604
6
                {
605
6
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
606
6
                            ExecEnv::GetInstance()->block_compression_mem_tracker());
607
                    // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
608
6
                    context->buffer->resize(max_len);
609
6
                }
610
6
                compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
611
6
                compressed_buf.size = max_len;
612
6
            }
613
614
6
            size_t compressed_len = LZ4_compress_HC_continue(
615
6
                    context->ctx, input.data, compressed_buf.data, input.size, compressed_buf.size);
616
6
            if (compressed_len == 0) {
617
0
                compress_failed = true;
618
0
                return Status::InvalidArgument("Output buffer's capacity is not enough, size={}",
619
0
                                               compressed_buf.size);
620
0
            }
621
6
            output->resize(compressed_len);
622
6
            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
623
6
                output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data),
624
6
                                    compressed_len);
625
6
            }
626
6
        } catch (...) {
627
            // Do not set compress_failed to release context
628
0
            DCHECK(!compress_failed);
629
0
            return Status::InternalError("Fail to do LZ4HC compress due to exception");
630
0
        }
631
6
        return Status::OK();
632
6
    }
633
634
11
    Status decompress(const Slice& input, Slice* output) override {
635
11
        auto decompressed_len =
636
11
                LZ4_decompress_safe(input.data, output->data, input.size, output->size);
637
11
        if (decompressed_len < 0) {
638
5
            return Status::InvalidArgument(
639
5
                    "destination buffer is not large enough or the source stream is detected "
640
5
                    "malformed, fail to do LZ4 decompress, error={}",
641
5
                    decompressed_len);
642
5
        }
643
6
        output->size = decompressed_len;
644
6
        return Status::OK();
645
11
    }
646
647
6
    size_t max_compressed_len(size_t len) override { return LZ4_compressBound(len); }
648
649
private:
650
6
    Status _acquire_compression_ctx(std::unique_ptr<Context>& out) {
651
6
        std::lock_guard<std::mutex> l(_ctx_mutex);
652
6
        if (_ctx_pool.empty()) {
653
1
            std::unique_ptr<Context> localCtx = Context::create_unique();
654
1
            if (localCtx.get() == nullptr) {
655
0
                return Status::InvalidArgument("new LZ4HC context error");
656
0
            }
657
1
            localCtx->ctx = LZ4_createStreamHC();
658
1
            if (localCtx->ctx == nullptr) {
659
0
                return Status::InvalidArgument("LZ4_createStreamHC error");
660
0
            }
661
1
            out = std::move(localCtx);
662
1
            return Status::OK();
663
1
        }
664
5
        out = std::move(_ctx_pool.back());
665
5
        _ctx_pool.pop_back();
666
5
        return Status::OK();
667
6
    }
668
6
    void _release_compression_ctx(std::unique_ptr<Context> context) {
669
6
        DCHECK(context);
670
6
        LZ4_resetStreamHC_fast(context->ctx, _compression_level);
671
6
        std::lock_guard<std::mutex> l(_ctx_mutex);
672
6
        _ctx_pool.push_back(std::move(context));
673
6
    }
674
675
private:
676
    int64_t _compression_level = config::LZ4_HC_compression_level;
677
    mutable std::mutex _ctx_mutex;
678
    mutable std::vector<std::unique_ptr<Context>> _ctx_pool;
679
};
680
681
class SnappySlicesSource : public snappy::Source {
682
public:
683
    SnappySlicesSource(const std::vector<Slice>& slices)
684
1
            : _available(0), _cur_slice(0), _slice_off(0) {
685
5
        for (auto& slice : slices) {
686
            // We filter empty slice here to avoid complicated process
687
5
            if (slice.size == 0) {
688
1
                continue;
689
1
            }
690
4
            _available += slice.size;
691
4
            _slices.push_back(slice);
692
4
        }
693
1
    }
694
1
    ~SnappySlicesSource() override {}
695
696
    // Return the number of bytes left to read from the source
697
1
    size_t Available() const override { return _available; }
698
699
    // Peek at the next flat region of the source.  Does not reposition
700
    // the source.  The returned region is empty iff Available()==0.
701
    //
702
    // Returns a pointer to the beginning of the region and store its
703
    // length in *len.
704
    //
705
    // The returned region is valid until the next call to Skip() or
706
    // until this object is destroyed, whichever occurs first.
707
    //
708
    // The returned region may be larger than Available() (for example
709
    // if this ByteSource is a view on a substring of a larger source).
710
    // The caller is responsible for ensuring that it only reads the
711
    // Available() bytes.
712
19
    const char* Peek(size_t* len) override {
713
19
        if (_available == 0) {
714
0
            *len = 0;
715
0
            return nullptr;
716
0
        }
717
        // we should assure that *len is not 0
718
19
        *len = _slices[_cur_slice].size - _slice_off;
719
19
        DCHECK(*len != 0);
720
19
        return _slices[_cur_slice].data + _slice_off;
721
19
    }
722
723
    // Skip the next n bytes.  Invalidates any buffer returned by
724
    // a previous call to Peek().
725
    // REQUIRES: Available() >= n
726
20
    void Skip(size_t n) override {
727
20
        _available -= n;
728
24
        while (n > 0) {
729
19
            auto left = _slices[_cur_slice].size - _slice_off;
730
19
            if (left > n) {
731
                // n can be digest in current slice
732
15
                _slice_off += n;
733
15
                return;
734
15
            }
735
4
            _slice_off = 0;
736
4
            _cur_slice++;
737
4
            n -= left;
738
4
        }
739
20
    }
740
741
private:
742
    std::vector<Slice> _slices;
743
    size_t _available;
744
    size_t _cur_slice;
745
    size_t _slice_off;
746
};
747
748
class SnappyBlockCompression : public BlockCompressionCodec {
749
public:
750
86
    static SnappyBlockCompression* instance() {
751
86
        static SnappyBlockCompression s_instance;
752
86
        return &s_instance;
753
86
    }
754
1
    ~SnappyBlockCompression() override = default;
755
756
34
    Status compress(const Slice& input, faststring* output) override {
757
34
        size_t max_len = max_compressed_len(input.size);
758
34
        output->resize(max_len);
759
34
        Slice s(*output);
760
761
34
        snappy::RawCompress(input.data, input.size, s.data, &s.size);
762
34
        output->resize(s.size);
763
34
        return Status::OK();
764
34
    }
765
766
76
    Status decompress(const Slice& input, Slice* output) override {
767
76
        if (!snappy::RawUncompress(input.data, input.size, output->data)) {
768
0
            return Status::InvalidArgument("Fail to do Snappy decompress");
769
0
        }
770
        // NOTE: GetUncompressedLength only takes O(1) time
771
76
        snappy::GetUncompressedLength(input.data, input.size, &output->size);
772
76
        return Status::OK();
773
76
    }
774
775
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
776
1
                    faststring* output) override {
777
1
        auto max_len = max_compressed_len(uncompressed_size);
778
1
        output->resize(max_len);
779
780
1
        SnappySlicesSource source(inputs);
781
1
        snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(output->data()));
782
1
        output->resize(snappy::Compress(&source, &sink));
783
1
        return Status::OK();
784
1
    }
785
786
35
    size_t max_compressed_len(size_t len) override { return snappy::MaxCompressedLength(len); }
787
};
788
789
class HadoopSnappyBlockCompression : public SnappyBlockCompression {
790
public:
791
0
    static HadoopSnappyBlockCompression* instance() {
792
0
        static HadoopSnappyBlockCompression s_instance;
793
0
        return &s_instance;
794
0
    }
795
0
    ~HadoopSnappyBlockCompression() override = default;
796
797
    // hadoop use block compression for snappy
798
    // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
799
0
    Status compress(const Slice& input, faststring* output) override {
800
        // be same with hadop https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java
801
0
        size_t snappy_block_size = config::snappy_compression_block_size;
802
0
        size_t overhead = snappy_block_size / 6 + 32;
803
0
        size_t max_input_size = snappy_block_size - overhead;
804
805
0
        size_t data_len = input.size;
806
0
        char* data = input.data;
807
0
        std::vector<OwnedSlice> buffers;
808
0
        size_t out_len = 0;
809
810
0
        while (data_len > 0) {
811
0
            size_t input_size = std::min(data_len, max_input_size);
812
0
            Slice input_slice(data, input_size);
813
0
            faststring output_data;
814
0
            RETURN_IF_ERROR(SnappyBlockCompression::compress(input_slice, &output_data));
815
0
            out_len += output_data.size();
816
            // the OwnedSlice will be moved here
817
0
            buffers.push_back(output_data.build());
818
0
            data += input_size;
819
0
            data_len -= input_size;
820
0
        }
821
822
        // hadoop block compression: umcompressed_length | compressed_length1 | compressed_data1 | compressed_length2 | compressed_data2 | ...
823
0
        size_t total_output_len = 4 + 4 * buffers.size() + out_len;
824
0
        output->resize(total_output_len);
825
0
        char* output_buffer = (char*)output->data();
826
0
        BigEndian::Store32(output_buffer, input.get_size());
827
0
        output_buffer += 4;
828
0
        for (const auto& buffer : buffers) {
829
0
            auto slice = buffer.slice();
830
0
            BigEndian::Store32(output_buffer, slice.get_size());
831
0
            output_buffer += 4;
832
0
            memcpy(output_buffer, slice.get_data(), slice.get_size());
833
0
            output_buffer += slice.get_size();
834
0
        }
835
836
0
        DCHECK_EQ(output_buffer - (char*)output->data(), total_output_len);
837
838
0
        return Status::OK();
839
0
    }
840
841
0
    Status decompress(const Slice& input, Slice* output) override {
842
0
        return Status::InternalError("unimplement: SnappyHadoopBlockCompression::decompress");
843
0
    }
844
};
845
846
class ZlibBlockCompression : public BlockCompressionCodec {
847
public:
848
2
    static ZlibBlockCompression* instance() {
849
2
        static ZlibBlockCompression s_instance;
850
2
        return &s_instance;
851
2
    }
852
1
    ~ZlibBlockCompression() override = default;
853
854
5
    Status compress(const Slice& input, faststring* output) override {
855
5
        size_t max_len = max_compressed_len(input.size);
856
5
        output->resize(max_len);
857
5
        Slice s(*output);
858
859
5
        auto zres = ::compress((Bytef*)s.data, &s.size, (Bytef*)input.data, input.size);
860
5
        if (zres == Z_MEM_ERROR) {
861
0
            throw Exception(Status::MemoryLimitExceeded(fmt::format(
862
0
                    "ZLib compression failed due to memory allocationerror.error = {}, res = {} ",
863
0
                    zError(zres), zres)));
864
5
        } else if (zres != Z_OK) {
865
0
            return Status::InternalError("Fail to do Zlib compress, error={}", zError(zres));
866
0
        }
867
5
        output->resize(s.size);
868
5
        return Status::OK();
869
5
    }
870
871
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
872
1
                    faststring* output) override {
873
1
        size_t max_len = max_compressed_len(uncompressed_size);
874
1
        output->resize(max_len);
875
876
1
        z_stream zstrm;
877
1
        zstrm.zalloc = Z_NULL;
878
1
        zstrm.zfree = Z_NULL;
879
1
        zstrm.opaque = Z_NULL;
880
1
        auto zres = deflateInit(&zstrm, Z_DEFAULT_COMPRESSION);
881
1
        if (zres == Z_MEM_ERROR) {
882
0
            throw Exception(Status::MemoryLimitExceeded(
883
0
                    "Fail to do ZLib stream compress, error={}, res={}", zError(zres), zres));
884
1
        } else if (zres != Z_OK) {
885
0
            return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
886
0
                                         zError(zres), zres);
887
0
        }
888
        // we assume that output is e
889
1
        zstrm.next_out = (Bytef*)output->data();
890
1
        zstrm.avail_out = output->size();
891
6
        for (int i = 0; i < inputs.size(); ++i) {
892
5
            if (inputs[i].size == 0) {
893
1
                continue;
894
1
            }
895
4
            zstrm.next_in = (Bytef*)inputs[i].data;
896
4
            zstrm.avail_in = inputs[i].size;
897
4
            int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH;
898
899
4
            zres = deflate(&zstrm, flush);
900
4
            if (zres != Z_OK && zres != Z_STREAM_END) {
901
0
                return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
902
0
                                             zError(zres), zres);
903
0
            }
904
4
        }
905
906
1
        output->resize(zstrm.total_out);
907
1
        zres = deflateEnd(&zstrm);
908
1
        if (zres == Z_DATA_ERROR) {
909
0
            return Status::InvalidArgument("Fail to do deflateEnd, error={}, res={}", zError(zres),
910
0
                                           zres);
911
1
        } else if (zres != Z_OK) {
912
0
            return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}",
913
0
                                         zError(zres), zres);
914
0
        }
915
1
        return Status::OK();
916
1
    }
917
918
14
    Status decompress(const Slice& input, Slice* output) override {
919
14
        size_t input_size = input.size;
920
14
        auto zres =
921
14
                ::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size);
922
14
        if (zres == Z_DATA_ERROR) {
923
1
            return Status::InvalidArgument("Fail to do ZLib decompress, error={}", zError(zres));
924
13
        } else if (zres == Z_MEM_ERROR) {
925
0
            throw Exception(Status::MemoryLimitExceeded("Fail to do ZLib decompress, error={}",
926
0
                                                        zError(zres)));
927
13
        } else if (zres != Z_OK) {
928
7
            return Status::InternalError("Fail to do ZLib decompress, error={}", zError(zres));
929
7
        }
930
6
        return Status::OK();
931
14
    }
932
933
6
    size_t max_compressed_len(size_t len) override {
934
        // one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block
935
6
        return len + 6 + 5 * ((len >> 14) + 1);
936
6
    }
937
};
938
939
class Bzip2BlockCompression : public BlockCompressionCodec {
940
public:
941
0
    static Bzip2BlockCompression* instance() {
942
0
        static Bzip2BlockCompression s_instance;
943
0
        return &s_instance;
944
0
    }
945
0
    ~Bzip2BlockCompression() override = default;
946
947
0
    Status compress(const Slice& input, faststring* output) override {
948
0
        size_t max_len = max_compressed_len(input.size);
949
0
        output->resize(max_len);
950
0
        uint32_t size = output->size();
951
0
        auto bzres = BZ2_bzBuffToBuffCompress((char*)output->data(), &size, (char*)input.data,
952
0
                                              input.size, 9, 0, 0);
953
0
        if (bzres == BZ_MEM_ERROR) {
954
0
            throw Exception(
955
0
                    Status::MemoryLimitExceeded("Fail to do Bzip2 compress, ret={}", bzres));
956
0
        } else if (bzres == BZ_PARAM_ERROR) {
957
0
            return Status::InvalidArgument("Fail to do Bzip2 compress, ret={}", bzres);
958
0
        } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK &&
959
0
                   bzres != BZ_STREAM_END && bzres != BZ_OK) {
960
0
            return Status::InternalError("Failed to init bz2. status code: {}", bzres);
961
0
        }
962
0
        output->resize(size);
963
0
        return Status::OK();
964
0
    }
965
966
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
967
0
                    faststring* output) override {
968
0
        size_t max_len = max_compressed_len(uncompressed_size);
969
0
        output->resize(max_len);
970
971
0
        bz_stream bzstrm;
972
0
        bzero(&bzstrm, sizeof(bzstrm));
973
0
        int bzres = BZ2_bzCompressInit(&bzstrm, 9, 0, 0);
974
0
        if (bzres == BZ_PARAM_ERROR) {
975
0
            return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres);
976
0
        } else if (bzres == BZ_MEM_ERROR) {
977
0
            throw Exception(
978
0
                    Status::MemoryLimitExceeded("Failed to init bz2. status code: {}", bzres));
979
0
        } else if (bzres != BZ_OK) {
980
0
            return Status::InternalError("Failed to init bz2. status code: {}", bzres);
981
0
        }
982
        // we assume that output is e
983
0
        bzstrm.next_out = (char*)output->data();
984
0
        bzstrm.avail_out = output->size();
985
0
        for (int i = 0; i < inputs.size(); ++i) {
986
0
            if (inputs[i].size == 0) {
987
0
                continue;
988
0
            }
989
0
            bzstrm.next_in = (char*)inputs[i].data;
990
0
            bzstrm.avail_in = inputs[i].size;
991
0
            int flush = (i == (inputs.size() - 1)) ? BZ_FINISH : BZ_RUN;
992
993
0
            bzres = BZ2_bzCompress(&bzstrm, flush);
994
0
            if (bzres == BZ_PARAM_ERROR) {
995
0
                return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres);
996
0
            } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK &&
997
0
                       bzres != BZ_STREAM_END && bzres != BZ_OK) {
998
0
                return Status::InternalError("Failed to init bz2. status code: {}", bzres);
999
0
            }
1000
0
        }
1001
1002
0
        size_t total_out = (size_t)bzstrm.total_out_hi32 << 32 | (size_t)bzstrm.total_out_lo32;
1003
0
        output->resize(total_out);
1004
0
        bzres = BZ2_bzCompressEnd(&bzstrm);
1005
0
        if (bzres == BZ_PARAM_ERROR) {
1006
0
            return Status::InvalidArgument("Fail to do deflateEnd on bzip2 stream, res={}", bzres);
1007
0
        } else if (bzres != BZ_OK) {
1008
0
            return Status::InternalError("Fail to do deflateEnd on bzip2 stream, res={}", bzres);
1009
0
        }
1010
0
        return Status::OK();
1011
0
    }
1012
1013
0
    Status decompress(const Slice& input, Slice* output) override {
1014
0
        return Status::InternalError("unimplement: Bzip2BlockCompression::decompress");
1015
0
    }
1016
1017
0
    size_t max_compressed_len(size_t len) override {
1018
        // TODO: make sure the max_compressed_len for bzip2
1019
0
        return len * 2;
1020
0
    }
1021
};
1022
1023
// for ZSTD compression and decompression, with BOTH fast and high compression ratio
1024
class ZstdBlockCompression : public BlockCompressionCodec {
1025
private:
1026
    class CContext {
1027
        ENABLE_FACTORY_CREATOR(CContext);
1028
1029
    public:
1030
1
        CContext() : ctx(nullptr) {
1031
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
1032
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
1033
1
            buffer = std::make_unique<faststring>();
1034
1
        }
1035
        ZSTD_CCtx* ctx;
1036
        std::unique_ptr<faststring> buffer;
1037
1
        ~CContext() {
1038
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
1039
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
1040
1
            if (ctx) {
1041
1
                ZSTD_freeCCtx(ctx);
1042
1
            }
1043
1
            buffer.reset();
1044
1
        }
1045
    };
1046
    class DContext {
1047
        ENABLE_FACTORY_CREATOR(DContext);
1048
1049
    public:
1050
6
        DContext() : ctx(nullptr) {}
1051
        ZSTD_DCtx* ctx;
1052
6
        ~DContext() {
1053
6
            if (ctx) {
1054
6
                ZSTD_freeDCtx(ctx);
1055
6
            }
1056
6
        }
1057
    };
1058
1059
public:
1060
359
    static ZstdBlockCompression* instance() {
1061
359
        static ZstdBlockCompression s_instance;
1062
359
        return &s_instance;
1063
359
    }
1064
1
    ~ZstdBlockCompression() {
1065
1
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
1066
1
                ExecEnv::GetInstance()->block_compression_mem_tracker());
1067
1
        _ctx_c_pool.clear();
1068
1
        _ctx_d_pool.clear();
1069
1
    }
1070
1071
280
    size_t max_compressed_len(size_t len) override { return ZSTD_compressBound(len); }
1072
1073
130
    Status compress(const Slice& input, faststring* output) override {
1074
130
        std::vector<Slice> inputs {input};
1075
130
        return compress(inputs, input.size, output);
1076
130
    }
1077
1078
    // follow ZSTD official example
1079
    //  https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c
1080
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
1081
280
                    faststring* output) override {
1082
280
        std::unique_ptr<CContext> context;
1083
280
        RETURN_IF_ERROR(_acquire_compression_ctx(context));
1084
280
        bool compress_failed = false;
1085
280
        Defer defer {[&] {
1086
280
            if (!compress_failed) {
1087
280
                _release_compression_ctx(std::move(context));
1088
280
            }
1089
280
        }};
1090
1091
280
        try {
1092
280
            size_t max_len = max_compressed_len(uncompressed_size);
1093
280
            Slice compressed_buf;
1094
280
            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
1095
                // use output directly
1096
4
                output->resize(max_len);
1097
4
                compressed_buf.data = reinterpret_cast<char*>(output->data());
1098
4
                compressed_buf.size = max_len;
1099
276
            } else {
1100
276
                {
1101
276
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
1102
276
                            ExecEnv::GetInstance()->block_compression_mem_tracker());
1103
                    // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
1104
276
                    context->buffer->resize(max_len);
1105
276
                }
1106
276
                compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
1107
276
                compressed_buf.size = max_len;
1108
276
            }
1109
1110
            // set compression level to default 3
1111
280
            auto ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_compressionLevel,
1112
280
                                              ZSTD_CLEVEL_DEFAULT);
1113
280
            if (ZSTD_isError(ret)) {
1114
0
                return Status::InvalidArgument("ZSTD_CCtx_setParameter compression level error: {}",
1115
0
                                               ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
1116
0
            }
1117
            // set checksum flag to 1
1118
280
            ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1);
1119
280
            if (ZSTD_isError(ret)) {
1120
0
                return Status::InvalidArgument("ZSTD_CCtx_setParameter checksumFlag error: {}",
1121
0
                                               ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
1122
0
            }
1123
1124
280
            ZSTD_outBuffer out_buf = {compressed_buf.data, compressed_buf.size, 0};
1125
1126
564
            for (size_t i = 0; i < inputs.size(); i++) {
1127
284
                ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0};
1128
1129
284
                bool last_input = (i == inputs.size() - 1);
1130
284
                auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue;
1131
1132
284
                bool finished = false;
1133
284
                do {
1134
                    // do compress
1135
284
                    auto ret = ZSTD_compressStream2(context->ctx, &out_buf, &in_buf, mode);
1136
1137
284
                    if (ZSTD_isError(ret)) {
1138
0
                        compress_failed = true;
1139
0
                        return Status::InternalError("ZSTD_compressStream2 error: {}",
1140
0
                                                     ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
1141
0
                    }
1142
1143
                    // ret is ZSTD hint for needed output buffer size
1144
284
                    if (ret > 0 && out_buf.pos == out_buf.size) {
1145
0
                        compress_failed = true;
1146
0
                        return Status::InternalError("ZSTD_compressStream2 output buffer full");
1147
0
                    }
1148
1149
284
                    finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size);
1150
284
                } while (!finished);
1151
284
            }
1152
1153
            // set compressed size for caller
1154
280
            output->resize(out_buf.pos);
1155
280
            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
1156
276
                output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), out_buf.pos);
1157
276
            }
1158
280
        } catch (std::exception& e) {
1159
0
            return Status::InternalError("Fail to do ZSTD compress due to exception {}", e.what());
1160
0
        } catch (...) {
1161
            // Do not set compress_failed to release context
1162
0
            DCHECK(!compress_failed);
1163
0
            return Status::InternalError("Fail to do ZSTD compress due to exception");
1164
0
        }
1165
1166
280
        return Status::OK();
1167
280
    }
1168
1169
183
    Status decompress(const Slice& input, Slice* output) override {
1170
183
        std::unique_ptr<DContext> context;
1171
183
        bool decompress_failed = false;
1172
183
        RETURN_IF_ERROR(_acquire_decompression_ctx(context));
1173
183
        Defer defer {[&] {
1174
183
            if (!decompress_failed) {
1175
178
                _release_decompression_ctx(std::move(context));
1176
178
            }
1177
183
        }};
1178
1179
183
        size_t ret = ZSTD_decompressDCtx(context->ctx, output->data, output->size, input.data,
1180
183
                                         input.size);
1181
183
        if (ZSTD_isError(ret)) {
1182
5
            decompress_failed = true;
1183
5
            return Status::InternalError("ZSTD_decompressDCtx error: {}",
1184
5
                                         ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
1185
5
        }
1186
1187
        // set decompressed size for caller
1188
178
        output->size = ret;
1189
1190
178
        return Status::OK();
1191
183
    }
1192
1193
private:
1194
280
    Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) {
1195
280
        std::lock_guard<std::mutex> l(_ctx_c_mutex);
1196
280
        if (_ctx_c_pool.empty()) {
1197
1
            std::unique_ptr<CContext> localCtx = CContext::create_unique();
1198
1
            if (localCtx.get() == nullptr) {
1199
0
                return Status::InvalidArgument("failed to new ZSTD CContext");
1200
0
            }
1201
            //typedef LZ4F_cctx* LZ4F_compressionContext_t;
1202
1
            localCtx->ctx = ZSTD_createCCtx();
1203
1
            if (localCtx->ctx == nullptr) {
1204
0
                return Status::InvalidArgument("Failed to create ZSTD compress ctx");
1205
0
            }
1206
1
            out = std::move(localCtx);
1207
1
            return Status::OK();
1208
1
        }
1209
279
        out = std::move(_ctx_c_pool.back());
1210
279
        _ctx_c_pool.pop_back();
1211
279
        return Status::OK();
1212
280
    }
1213
280
    void _release_compression_ctx(std::unique_ptr<CContext> context) {
1214
280
        DCHECK(context);
1215
280
        auto ret = ZSTD_CCtx_reset(context->ctx, ZSTD_reset_session_only);
1216
280
        DCHECK(!ZSTD_isError(ret));
1217
280
        std::lock_guard<std::mutex> l(_ctx_c_mutex);
1218
280
        _ctx_c_pool.push_back(std::move(context));
1219
280
    }
1220
1221
183
    Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) {
1222
183
        std::lock_guard<std::mutex> l(_ctx_d_mutex);
1223
183
        if (_ctx_d_pool.empty()) {
1224
6
            std::unique_ptr<DContext> localCtx = DContext::create_unique();
1225
6
            if (localCtx.get() == nullptr) {
1226
0
                return Status::InvalidArgument("failed to new ZSTD DContext");
1227
0
            }
1228
6
            localCtx->ctx = ZSTD_createDCtx();
1229
6
            if (localCtx->ctx == nullptr) {
1230
0
                return Status::InvalidArgument("Fail to init ZSTD decompress context");
1231
0
            }
1232
6
            out = std::move(localCtx);
1233
6
            return Status::OK();
1234
6
        }
1235
177
        out = std::move(_ctx_d_pool.back());
1236
177
        _ctx_d_pool.pop_back();
1237
177
        return Status::OK();
1238
183
    }
1239
178
    void _release_decompression_ctx(std::unique_ptr<DContext> context) {
1240
178
        DCHECK(context);
1241
        // reset ctx to start a new decompress session
1242
178
        auto ret = ZSTD_DCtx_reset(context->ctx, ZSTD_reset_session_only);
1243
178
        DCHECK(!ZSTD_isError(ret));
1244
178
        std::lock_guard<std::mutex> l(_ctx_d_mutex);
1245
178
        _ctx_d_pool.push_back(std::move(context));
1246
178
    }
1247
1248
private:
1249
    mutable std::mutex _ctx_c_mutex;
1250
    mutable std::vector<std::unique_ptr<CContext>> _ctx_c_pool;
1251
1252
    mutable std::mutex _ctx_d_mutex;
1253
    mutable std::vector<std::unique_ptr<DContext>> _ctx_d_pool;
1254
};
1255
1256
class GzipBlockCompression : public ZlibBlockCompression {
1257
public:
1258
0
    static GzipBlockCompression* instance() {
1259
0
        static GzipBlockCompression s_instance;
1260
0
        return &s_instance;
1261
0
    }
1262
0
    ~GzipBlockCompression() override = default;
1263
1264
0
    Status compress(const Slice& input, faststring* output) override {
1265
0
        size_t max_len = max_compressed_len(input.size);
1266
0
        output->resize(max_len);
1267
1268
0
        z_stream z_strm = {};
1269
0
        z_strm.zalloc = Z_NULL;
1270
0
        z_strm.zfree = Z_NULL;
1271
0
        z_strm.opaque = Z_NULL;
1272
1273
0
        int zres = deflateInit2(&z_strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC,
1274
0
                                8, Z_DEFAULT_STRATEGY);
1275
1276
0
        if (zres == Z_MEM_ERROR) {
1277
0
            throw Exception(Status::MemoryLimitExceeded(
1278
0
                    "Fail to init ZLib compress, error={}, res={}", zError(zres), zres));
1279
0
        } else if (zres != Z_OK) {
1280
0
            return Status::InternalError("Fail to init ZLib compress, error={}, res={}",
1281
0
                                         zError(zres), zres);
1282
0
        }
1283
1284
0
        z_strm.next_in = (Bytef*)input.get_data();
1285
0
        z_strm.avail_in = input.get_size();
1286
0
        z_strm.next_out = (Bytef*)output->data();
1287
0
        z_strm.avail_out = output->size();
1288
1289
0
        zres = deflate(&z_strm, Z_FINISH);
1290
0
        if (zres != Z_OK && zres != Z_STREAM_END) {
1291
0
            return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
1292
0
                                         zError(zres), zres);
1293
0
        }
1294
1295
0
        output->resize(z_strm.total_out);
1296
0
        zres = deflateEnd(&z_strm);
1297
0
        if (zres == Z_DATA_ERROR) {
1298
0
            return Status::InvalidArgument("Fail to end zlib compress");
1299
0
        } else if (zres != Z_OK) {
1300
0
            return Status::InternalError("Fail to end zlib compress");
1301
0
        }
1302
0
        return Status::OK();
1303
0
    }
1304
1305
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
1306
0
                    faststring* output) override {
1307
0
        size_t max_len = max_compressed_len(uncompressed_size);
1308
0
        output->resize(max_len);
1309
1310
0
        z_stream zstrm;
1311
0
        zstrm.zalloc = Z_NULL;
1312
0
        zstrm.zfree = Z_NULL;
1313
0
        zstrm.opaque = Z_NULL;
1314
0
        auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC,
1315
0
                                 8, Z_DEFAULT_STRATEGY);
1316
0
        if (zres == Z_MEM_ERROR) {
1317
0
            throw Exception(Status::MemoryLimitExceeded(
1318
0
                    "Fail to init ZLib stream compress, error={}, res={}", zError(zres), zres));
1319
0
        } else if (zres != Z_OK) {
1320
0
            return Status::InternalError("Fail to init ZLib stream compress, error={}, res={}",
1321
0
                                         zError(zres), zres);
1322
0
        }
1323
1324
        // we assume that output is e
1325
0
        zstrm.next_out = (Bytef*)output->data();
1326
0
        zstrm.avail_out = output->size();
1327
0
        for (int i = 0; i < inputs.size(); ++i) {
1328
0
            if (inputs[i].size == 0) {
1329
0
                continue;
1330
0
            }
1331
0
            zstrm.next_in = (Bytef*)inputs[i].data;
1332
0
            zstrm.avail_in = inputs[i].size;
1333
0
            int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH;
1334
1335
0
            zres = deflate(&zstrm, flush);
1336
0
            if (zres != Z_OK && zres != Z_STREAM_END) {
1337
0
                return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
1338
0
                                             zError(zres), zres);
1339
0
            }
1340
0
        }
1341
1342
0
        output->resize(zstrm.total_out);
1343
0
        zres = deflateEnd(&zstrm);
1344
0
        if (zres == Z_DATA_ERROR) {
1345
0
            return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}",
1346
0
                                           zError(zres), zres);
1347
0
        } else if (zres != Z_OK) {
1348
0
            return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}",
1349
0
                                         zError(zres), zres);
1350
0
        }
1351
0
        return Status::OK();
1352
0
    }
1353
1354
0
    Status decompress(const Slice& input, Slice* output) override {
1355
0
        z_stream z_strm = {};
1356
0
        z_strm.zalloc = Z_NULL;
1357
0
        z_strm.zfree = Z_NULL;
1358
0
        z_strm.opaque = Z_NULL;
1359
1360
0
        int ret = inflateInit2(&z_strm, MAX_WBITS + GZIP_CODEC);
1361
0
        if (ret != Z_OK) {
1362
0
            return Status::InternalError("Fail to init ZLib decompress, error={}, res={}",
1363
0
                                         zError(ret), ret);
1364
0
        }
1365
1366
        // 1. set input and output
1367
0
        z_strm.next_in = reinterpret_cast<Bytef*>(input.data);
1368
0
        z_strm.avail_in = input.size;
1369
0
        z_strm.next_out = reinterpret_cast<Bytef*>(output->data);
1370
0
        z_strm.avail_out = output->size;
1371
1372
0
        if (z_strm.avail_out > 0) {
1373
            // We only support non-streaming use case  for block decompressor
1374
0
            ret = inflate(&z_strm, Z_FINISH);
1375
0
            if (ret != Z_OK && ret != Z_STREAM_END) {
1376
0
                (void)inflateEnd(&z_strm);
1377
0
                if (ret == Z_MEM_ERROR) {
1378
0
                    throw Exception(Status::MemoryLimitExceeded(
1379
0
                            "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret));
1380
0
                } else if (ret == Z_DATA_ERROR) {
1381
0
                    return Status::InvalidArgument(
1382
0
                            "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret);
1383
0
                }
1384
0
                return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
1385
0
                                             zError(ret), ret);
1386
0
            }
1387
0
        }
1388
0
        (void)inflateEnd(&z_strm);
1389
1390
0
        return Status::OK();
1391
0
    }
1392
1393
0
    size_t max_compressed_len(size_t len) override {
1394
0
        z_stream zstrm;
1395
0
        zstrm.zalloc = Z_NULL;
1396
0
        zstrm.zfree = Z_NULL;
1397
0
        zstrm.opaque = Z_NULL;
1398
0
        auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC,
1399
0
                                 MEM_LEVEL, Z_DEFAULT_STRATEGY);
1400
0
        if (zres != Z_OK) {
1401
            // Fall back to zlib estimate logic for deflate, notice this may
1402
            // cause decompress error
1403
0
            LOG(WARNING) << "Fail to do ZLib stream compress, error=" << zError(zres)
1404
0
                         << ", res=" << zres;
1405
0
            return ZlibBlockCompression::max_compressed_len(len);
1406
0
        } else {
1407
0
            zres = deflateEnd(&zstrm);
1408
0
            if (zres != Z_OK) {
1409
0
                LOG(WARNING) << "Fail to do deflateEnd on ZLib stream, error=" << zError(zres)
1410
0
                             << ", res=" << zres;
1411
0
            }
1412
            // Mark, maintainer of zlib, has stated that 12 needs to be added to
1413
            // result for gzip
1414
            // http://compgroups.net/comp.unix.programmer/gzip-compressing-an-in-memory-string-usi/54854
1415
            // To have a safe upper bound for "wrapper variations", we add 32 to
1416
            // estimate
1417
0
            int upper_bound = deflateBound(&zstrm, len) + 32;
1418
0
            return upper_bound;
1419
0
        }
1420
0
    }
1421
1422
private:
1423
    // Magic number for zlib, see https://zlib.net/manual.html for more details.
1424
    const static int GZIP_CODEC = 16; // gzip
1425
    // The memLevel parameter specifies how much memory should be allocated for
1426
    // the internal compression state.
1427
    const static int MEM_LEVEL = 8;
1428
};
1429
1430
// Only used on x86 or x86_64
1431
#if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \
1432
        defined(__i386) || defined(_M_IX86)
1433
class GzipBlockCompressionByLibdeflate final : public GzipBlockCompression {
1434
public:
1435
0
    GzipBlockCompressionByLibdeflate() : GzipBlockCompression() {}
1436
0
    static GzipBlockCompressionByLibdeflate* instance() {
1437
0
        static GzipBlockCompressionByLibdeflate s_instance;
1438
0
        return &s_instance;
1439
0
    }
1440
0
    ~GzipBlockCompressionByLibdeflate() override = default;
1441
1442
0
    Status decompress(const Slice& input, Slice* output) override {
1443
0
        if (input.empty()) {
1444
0
            output->size = 0;
1445
0
            return Status::OK();
1446
0
        }
1447
0
        thread_local std::unique_ptr<libdeflate_decompressor, void (*)(libdeflate_decompressor*)>
1448
0
                decompressor {libdeflate_alloc_decompressor(), libdeflate_free_decompressor};
1449
0
        if (!decompressor) {
1450
0
            return Status::InternalError("libdeflate_alloc_decompressor error.");
1451
0
        }
1452
0
        std::size_t out_len;
1453
0
        auto result = libdeflate_gzip_decompress(decompressor.get(), input.data, input.size,
1454
0
                                                 output->data, output->size, &out_len);
1455
0
        if (result != LIBDEFLATE_SUCCESS) {
1456
0
            return Status::InternalError("libdeflate_gzip_decompress error, res={}", result);
1457
0
        }
1458
0
        return Status::OK();
1459
0
    }
1460
};
1461
#endif
1462
1463
class LzoBlockCompression final : public BlockCompressionCodec {
1464
public:
1465
0
    static LzoBlockCompression* instance() {
1466
0
        static LzoBlockCompression s_instance;
1467
0
        return &s_instance;
1468
0
    }
1469
1470
0
    Status compress(const Slice& input, faststring* output) override {
1471
0
        return Status::InvalidArgument("not impl lzo compress.");
1472
0
    }
1473
0
    size_t max_compressed_len(size_t len) override { return 0; };
1474
0
    Status decompress(const Slice& input, Slice* output) override {
1475
0
        auto* input_ptr = input.data;
1476
0
        auto remain_input_size = input.size;
1477
0
        auto* output_ptr = output->data;
1478
0
        auto remain_output_size = output->size;
1479
0
        auto* output_limit = output->data + output->size;
1480
1481
        // Example:
1482
        // OriginData(The original data will be divided into several large data block.) :
1483
        //      large data block1 | large data block2 | large data block3 | ....
1484
        // The large data block will be divided into several small data block.
1485
        // Suppose a large data block is divided into three small blocks:
1486
        // large data block1:            | small block1 | small block2 | small block3 |
1487
        // CompressData:   <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]>
1488
        //
1489
        // A : original length of the current block of large data block.
1490
        // sizeof(A) = 4 bytes.
1491
        // A = length(small block1) + length(small block2) + length(small block3)
1492
        // Bx : length of  small data block bx.
1493
        // sizeof(Bx) = 4 bytes.
1494
        // Bx = length(compress(small blockx))
1495
0
        try {
1496
0
            while (remain_input_size > 0) {
1497
0
                if (remain_input_size < 4) {
1498
0
                    return Status::InvalidArgument(
1499
0
                            "Need more input buffer to get large_block_uncompressed_len.");
1500
0
                }
1501
1502
0
                uint32_t large_block_uncompressed_len = BigEndian::Load32(input_ptr);
1503
0
                input_ptr += 4;
1504
0
                remain_input_size -= 4;
1505
1506
0
                if (remain_output_size < large_block_uncompressed_len) {
1507
0
                    return Status::InvalidArgument(
1508
0
                            "Need more output buffer to get uncompressed data.");
1509
0
                }
1510
1511
0
                while (large_block_uncompressed_len > 0) {
1512
0
                    if (remain_input_size < 4) {
1513
0
                        return Status::InvalidArgument(
1514
0
                                "Need more input buffer to get small_block_compressed_len.");
1515
0
                    }
1516
1517
0
                    uint32_t small_block_compressed_len = BigEndian::Load32(input_ptr);
1518
0
                    input_ptr += 4;
1519
0
                    remain_input_size -= 4;
1520
1521
0
                    if (remain_input_size < small_block_compressed_len) {
1522
0
                        return Status::InvalidArgument(
1523
0
                                "Need more input buffer to decompress small block.");
1524
0
                    }
1525
1526
0
                    auto small_block_uncompressed_len =
1527
0
                            orc::lzoDecompress(input_ptr, input_ptr + small_block_compressed_len,
1528
0
                                               output_ptr, output_limit);
1529
1530
0
                    input_ptr += small_block_compressed_len;
1531
0
                    remain_input_size -= small_block_compressed_len;
1532
1533
0
                    output_ptr += small_block_uncompressed_len;
1534
0
                    large_block_uncompressed_len -= small_block_uncompressed_len;
1535
0
                    remain_output_size -= small_block_uncompressed_len;
1536
0
                }
1537
0
            }
1538
0
        } catch (const orc::ParseError& e) {
1539
            //Prevent be from hanging due to orc::lzoDecompress throw exception
1540
0
            return Status::InternalError("Fail to do LZO decompress, error={}", e.what());
1541
0
        }
1542
0
        return Status::OK();
1543
0
    }
1544
};
1545
1546
class BrotliBlockCompression final : public BlockCompressionCodec {
1547
public:
1548
0
    static BrotliBlockCompression* instance() {
1549
0
        static BrotliBlockCompression s_instance;
1550
0
        return &s_instance;
1551
0
    }
1552
1553
0
    Status compress(const Slice& input, faststring* output) override {
1554
0
        return Status::InvalidArgument("not impl brotli compress.");
1555
0
    }
1556
1557
0
    size_t max_compressed_len(size_t len) override { return 0; };
1558
1559
0
    Status decompress(const Slice& input, Slice* output) override {
1560
        // The size of output buffer is always equal to the umcompressed length.
1561
0
        BrotliDecoderResult result = BrotliDecoderDecompress(
1562
0
                input.get_size(), reinterpret_cast<const uint8_t*>(input.get_data()), &output->size,
1563
0
                reinterpret_cast<uint8_t*>(output->data));
1564
0
        if (result != BROTLI_DECODER_RESULT_SUCCESS) {
1565
0
            return Status::InternalError("Brotli decompression failed, result={}", result);
1566
0
        }
1567
0
        return Status::OK();
1568
0
    }
1569
};
1570
1571
Status get_block_compression_codec(segment_v2::CompressionTypePB type,
1572
25.6k
                                   BlockCompressionCodec** codec) {
1573
25.6k
    switch (type) {
1574
130
    case segment_v2::CompressionTypePB::NO_COMPRESSION:
1575
130
        *codec = nullptr;
1576
130
        break;
1577
47
    case segment_v2::CompressionTypePB::SNAPPY:
1578
47
        *codec = SnappyBlockCompression::instance();
1579
47
        break;
1580
59
    case segment_v2::CompressionTypePB::LZ4:
1581
59
        *codec = Lz4BlockCompression::instance();
1582
59
        break;
1583
25.0k
    case segment_v2::CompressionTypePB::LZ4F:
1584
25.0k
        *codec = Lz4fBlockCompression::instance();
1585
25.0k
        break;
1586
2
    case segment_v2::CompressionTypePB::LZ4HC:
1587
2
        *codec = Lz4HCBlockCompression::instance();
1588
2
        break;
1589
2
    case segment_v2::CompressionTypePB::ZLIB:
1590
2
        *codec = ZlibBlockCompression::instance();
1591
2
        break;
1592
359
    case segment_v2::CompressionTypePB::ZSTD:
1593
359
        *codec = ZstdBlockCompression::instance();
1594
359
        break;
1595
0
    default:
1596
0
        return Status::InternalError("unknown compression type({})", type);
1597
25.6k
    }
1598
1599
25.6k
    return Status::OK();
1600
25.6k
}
1601
1602
// this can only be used in hive text write
1603
0
Status get_block_compression_codec(TFileCompressType::type type, BlockCompressionCodec** codec) {
1604
0
    switch (type) {
1605
0
    case TFileCompressType::PLAIN:
1606
0
        *codec = nullptr;
1607
0
        break;
1608
0
    case TFileCompressType::ZLIB:
1609
0
        *codec = ZlibBlockCompression::instance();
1610
0
        break;
1611
0
    case TFileCompressType::GZ:
1612
0
        *codec = GzipBlockCompression::instance();
1613
0
        break;
1614
0
    case TFileCompressType::BZ2:
1615
0
        *codec = Bzip2BlockCompression::instance();
1616
0
        break;
1617
0
    case TFileCompressType::LZ4BLOCK:
1618
0
        *codec = HadoopLz4BlockCompression::instance();
1619
0
        break;
1620
0
    case TFileCompressType::SNAPPYBLOCK:
1621
0
        *codec = HadoopSnappyBlockCompression::instance();
1622
0
        break;
1623
0
    case TFileCompressType::ZSTD:
1624
0
        *codec = ZstdBlockCompression::instance();
1625
0
        break;
1626
0
    default:
1627
0
        return Status::InternalError("unsupport compression type({}) int hive text", type);
1628
0
    }
1629
1630
0
    return Status::OK();
1631
0
}
1632
1633
Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_codec,
1634
39
                                   BlockCompressionCodec** codec) {
1635
39
    switch (parquet_codec) {
1636
0
    case tparquet::CompressionCodec::UNCOMPRESSED:
1637
0
        *codec = nullptr;
1638
0
        break;
1639
39
    case tparquet::CompressionCodec::SNAPPY:
1640
39
        *codec = SnappyBlockCompression::instance();
1641
39
        break;
1642
0
    case tparquet::CompressionCodec::LZ4_RAW: // we can use LZ4 compression algorithm parse LZ4_RAW
1643
0
    case tparquet::CompressionCodec::LZ4:
1644
0
        *codec = HadoopLz4BlockCompression::instance();
1645
0
        break;
1646
0
    case tparquet::CompressionCodec::ZSTD:
1647
0
        *codec = ZstdBlockCompression::instance();
1648
0
        break;
1649
0
    case tparquet::CompressionCodec::GZIP:
1650
// Only used on x86 or x86_64
1651
0
#if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \
1652
0
        defined(__i386) || defined(_M_IX86)
1653
0
        *codec = GzipBlockCompressionByLibdeflate::instance();
1654
#else
1655
        *codec = GzipBlockCompression::instance();
1656
#endif
1657
0
        break;
1658
0
    case tparquet::CompressionCodec::LZO:
1659
0
        *codec = LzoBlockCompression::instance();
1660
0
        break;
1661
0
    case tparquet::CompressionCodec::BROTLI:
1662
0
        *codec = BrotliBlockCompression::instance();
1663
0
        break;
1664
0
    default:
1665
0
        return Status::InternalError("unknown compression type({})", parquet_codec);
1666
39
    }
1667
1668
39
    return Status::OK();
1669
39
}
1670
1671
} // namespace doris