Coverage Report

Created: 2026-04-15 12:36

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/block_compression.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "util/block_compression.h"
19
20
#include <bzlib.h>
21
#include <gen_cpp/parquet_types.h>
22
#include <gen_cpp/segment_v2.pb.h>
23
#include <glog/logging.h>
24
25
#include <exception>
26
// Only used on x86 or x86_64
27
#if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \
28
        defined(__i386) || defined(_M_IX86)
29
#include <libdeflate.h>
30
#endif
31
#include <brotli/decode.h>
32
#include <glog/log_severity.h>
33
#include <glog/logging.h>
34
#include <lz4/lz4.h>
35
#include <lz4/lz4frame.h>
36
#include <lz4/lz4hc.h>
37
#include <snappy/snappy-sinksource.h>
38
#include <snappy/snappy.h>
39
#include <zconf.h>
40
#include <zlib.h>
41
#include <zstd.h>
42
#include <zstd_errors.h>
43
44
#include <algorithm>
45
#include <cstdint>
46
#include <limits>
47
#include <mutex>
48
#include <orc/Exceptions.hh>
49
#include <ostream>
50
51
#include "absl/strings/substitute.h"
52
#include "common/config.h"
53
#include "common/factory_creator.h"
54
#include "exec/common/endian.h"
55
#include "runtime/thread_context.h"
56
#include "util/decompressor.h"
57
#include "util/defer_op.h"
58
#include "util/faststring.h"
59
60
namespace orc {
61
/**
62
 * Decompress the bytes in to the output buffer.
63
 * @param inputAddress the start of the input
64
 * @param inputLimit one past the last byte of the input
65
 * @param outputAddress the start of the output buffer
66
 * @param outputLimit one past the last byte of the output buffer
67
 * @result the number of bytes decompressed
68
 */
69
uint64_t lzoDecompress(const char* inputAddress, const char* inputLimit, char* outputAddress,
70
                       char* outputLimit);
71
} // namespace orc
72
73
namespace doris {
74
75
// exception safe
76
Status BlockCompressionCodec::compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
77
299
                                       faststring* output) {
78
299
    faststring buf;
79
    // we compute total size to avoid more memory copy
80
299
    buf.reserve(uncompressed_size);
81
358
    for (auto& input : inputs) {
82
358
        buf.append(input.data, input.size);
83
358
    }
84
299
    return compress(buf, output);
85
299
}
86
87
26.0k
bool BlockCompressionCodec::exceed_max_compress_len(size_t uncompressed_size) {
88
26.0k
    return uncompressed_size > std::numeric_limits<int32_t>::max();
89
26.0k
}
90
91
class Lz4BlockCompression : public BlockCompressionCodec {
92
private:
93
    class Context {
94
        ENABLE_FACTORY_CREATOR(Context);
95
96
    public:
97
1
        Context() : ctx(nullptr) {
98
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
99
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
100
1
            buffer = std::make_unique<faststring>();
101
1
        }
102
        LZ4_stream_t* ctx;
103
        std::unique_ptr<faststring> buffer;
104
1
        ~Context() {
105
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
106
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
107
1
            if (ctx) {
108
1
                LZ4_freeStream(ctx);
109
1
            }
110
1
            buffer.reset();
111
1
        }
112
    };
113
114
public:
115
569
    static Lz4BlockCompression* instance() {
116
569
        static Lz4BlockCompression s_instance;
117
569
        return &s_instance;
118
569
    }
119
1
    ~Lz4BlockCompression() override {
120
1
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
121
1
                ExecEnv::GetInstance()->block_compression_mem_tracker());
122
1
        _ctx_pool.clear();
123
1
    }
124
125
350
    Status compress(const Slice& input, faststring* output) override {
126
350
        if (input.size > LZ4_MAX_INPUT_SIZE) {
127
0
            return Status::InvalidArgument(
128
0
                    "LZ4 not support those case(input.size>LZ4_MAX_INPUT_SIZE), maybe you should "
129
0
                    "change "
130
0
                    "fragment_transmission_compression_codec to snappy, input.size={}, "
131
0
                    "LZ4_MAX_INPUT_SIZE={}",
132
0
                    input.size, LZ4_MAX_INPUT_SIZE);
133
0
        }
134
135
350
        std::unique_ptr<Context> context;
136
350
        RETURN_IF_ERROR(_acquire_compression_ctx(context));
137
350
        bool compress_failed = false;
138
350
        Defer defer {[&] {
139
350
            if (!compress_failed) {
140
350
                _release_compression_ctx(std::move(context));
141
350
            }
142
350
        }};
143
144
350
        try {
145
350
            Slice compressed_buf;
146
350
            size_t max_len = max_compressed_len(input.size);
147
350
            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
148
                // use output directly
149
0
                output->resize(max_len);
150
0
                compressed_buf.data = reinterpret_cast<char*>(output->data());
151
0
                compressed_buf.size = max_len;
152
350
            } else {
153
                // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
154
350
                {
155
                    // context->buffer is resuable between queries, should accouting to
156
                    // global tracker.
157
350
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
158
350
                            ExecEnv::GetInstance()->block_compression_mem_tracker());
159
350
                    context->buffer->resize(max_len);
160
350
                }
161
350
                compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
162
350
                compressed_buf.size = max_len;
163
350
            }
164
165
            // input.size is aready checked before;
166
            // compressed_buf.size is got from max_compressed_len, which is
167
            // the return value of LZ4_compressBound, so it is safe to cast to int
168
350
            size_t compressed_len = LZ4_compress_fast_continue(
169
350
                    context->ctx, input.data, compressed_buf.data, static_cast<int>(input.size),
170
350
                    static_cast<int>(compressed_buf.size), ACCELARATION);
171
350
            if (compressed_len == 0) {
172
0
                compress_failed = true;
173
0
                return Status::InvalidArgument("Output buffer's capacity is not enough, size={}",
174
0
                                               compressed_buf.size);
175
0
            }
176
350
            output->resize(compressed_len);
177
350
            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
178
350
                output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data),
179
350
                                    compressed_len);
180
350
            }
181
350
        } catch (...) {
182
            // Do not set compress_failed to release context
183
0
            DCHECK(!compress_failed);
184
0
            return Status::InternalError("Fail to do LZ4Block compress due to exception");
185
0
        }
186
350
        return Status::OK();
187
350
    }
188
189
131
    Status decompress(const Slice& input, Slice* output) override {
190
131
        auto decompressed_len = LZ4_decompress_safe(
191
131
                input.data, output->data, cast_set<int>(input.size), cast_set<int>(output->size));
192
131
        if (decompressed_len < 0) {
193
5
            return Status::InternalError("fail to do LZ4 decompress, error={}", decompressed_len);
194
5
        }
195
126
        output->size = decompressed_len;
196
126
        return Status::OK();
197
131
    }
198
199
350
    size_t max_compressed_len(size_t len) override { return LZ4_compressBound(cast_set<int>(len)); }
200
201
private:
202
    // reuse LZ4 compress stream
203
350
    Status _acquire_compression_ctx(std::unique_ptr<Context>& out) {
204
350
        std::lock_guard<std::mutex> l(_ctx_mutex);
205
350
        if (_ctx_pool.empty()) {
206
1
            std::unique_ptr<Context> localCtx = Context::create_unique();
207
1
            if (localCtx.get() == nullptr) {
208
0
                return Status::InvalidArgument("new LZ4 context error");
209
0
            }
210
1
            localCtx->ctx = LZ4_createStream();
211
1
            if (localCtx->ctx == nullptr) {
212
0
                return Status::InvalidArgument("LZ4_createStream error");
213
0
            }
214
1
            out = std::move(localCtx);
215
1
            return Status::OK();
216
1
        }
217
349
        out = std::move(_ctx_pool.back());
218
349
        _ctx_pool.pop_back();
219
349
        return Status::OK();
220
350
    }
221
350
    void _release_compression_ctx(std::unique_ptr<Context> context) {
222
350
        DCHECK(context);
223
350
        LZ4_resetStream(context->ctx);
224
350
        std::lock_guard<std::mutex> l(_ctx_mutex);
225
350
        _ctx_pool.push_back(std::move(context));
226
350
    }
227
228
private:
229
    mutable std::mutex _ctx_mutex;
230
    mutable std::vector<std::unique_ptr<Context>> _ctx_pool;
231
    static const int32_t ACCELARATION = 1;
232
};
233
234
class HadoopLz4BlockCompression : public Lz4BlockCompression {
235
public:
236
0
    HadoopLz4BlockCompression() {
237
0
        Status st = Decompressor::create_decompressor(CompressType::LZ4BLOCK, &_decompressor);
238
0
        if (!st.ok()) {
239
0
            throw Exception(Status::FatalError(
240
0
                    "HadoopLz4BlockCompression construction failed. status = {}", st));
241
0
        }
242
0
    }
243
244
0
    ~HadoopLz4BlockCompression() override = default;
245
246
0
    static HadoopLz4BlockCompression* instance() {
247
0
        static HadoopLz4BlockCompression s_instance;
248
0
        return &s_instance;
249
0
    }
250
251
    // hadoop use block compression for lz4
252
    // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
253
0
    Status compress(const Slice& input, faststring* output) override {
254
        // be same with hadop https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java
255
0
        size_t lz4_block_size = config::lz4_compression_block_size;
256
0
        size_t overhead = lz4_block_size / 255 + 16;
257
0
        size_t max_input_size = lz4_block_size - overhead;
258
259
0
        size_t data_len = input.size;
260
0
        char* data = input.data;
261
0
        std::vector<OwnedSlice> buffers;
262
0
        size_t out_len = 0;
263
264
0
        while (data_len > 0) {
265
0
            size_t input_size = std::min(data_len, max_input_size);
266
0
            Slice input_slice(data, input_size);
267
0
            faststring output_data;
268
0
            RETURN_IF_ERROR(Lz4BlockCompression::compress(input_slice, &output_data));
269
0
            out_len += output_data.size();
270
0
            buffers.push_back(output_data.build());
271
0
            data += input_size;
272
0
            data_len -= input_size;
273
0
        }
274
275
        // hadoop block compression: umcompressed_length | compressed_length1 | compressed_data1 | compressed_length2 | compressed_data2 | ...
276
0
        size_t total_output_len = 4 + 4 * buffers.size() + out_len;
277
0
        output->resize(total_output_len);
278
0
        char* output_buffer = (char*)output->data();
279
0
        BigEndian::Store32(output_buffer, cast_set<uint32_t>(input.get_size()));
280
0
        output_buffer += 4;
281
0
        for (const auto& buffer : buffers) {
282
0
            auto slice = buffer.slice();
283
0
            BigEndian::Store32(output_buffer, cast_set<uint32_t>(slice.get_size()));
284
0
            output_buffer += 4;
285
0
            memcpy(output_buffer, slice.get_data(), slice.get_size());
286
0
            output_buffer += slice.get_size();
287
0
        }
288
289
0
        DCHECK_EQ(output_buffer - (char*)output->data(), total_output_len);
290
291
0
        return Status::OK();
292
0
    }
293
294
0
    Status decompress(const Slice& input, Slice* output) override {
295
0
        size_t input_bytes_read = 0;
296
0
        size_t decompressed_len = 0;
297
0
        size_t more_input_bytes = 0;
298
0
        size_t more_output_bytes = 0;
299
0
        bool stream_end = false;
300
0
        auto st = _decompressor->decompress((uint8_t*)input.data, cast_set<uint32_t>(input.size),
301
0
                                            &input_bytes_read, (uint8_t*)output->data,
302
0
                                            cast_set<uint32_t>(output->size), &decompressed_len,
303
0
                                            &stream_end, &more_input_bytes, &more_output_bytes);
304
        //try decompress use hadoopLz4 ,if failed fall back lz4.
305
0
        return (st != Status::OK() || stream_end != true)
306
0
                       ? Lz4BlockCompression::decompress(input, output)
307
0
                       : Status::OK();
308
0
    }
309
310
private:
311
    std::unique_ptr<Decompressor> _decompressor;
312
};
313
// Used for LZ4 frame format, decompress speed is two times faster than LZ4.
314
class Lz4fBlockCompression : public BlockCompressionCodec {
315
private:
316
    class CContext {
317
        ENABLE_FACTORY_CREATOR(CContext);
318
319
    public:
320
1
        CContext() : ctx(nullptr) {
321
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
322
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
323
1
            buffer = std::make_unique<faststring>();
324
1
        }
325
        LZ4F_compressionContext_t ctx;
326
        std::unique_ptr<faststring> buffer;
327
1
        ~CContext() {
328
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
329
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
330
1
            if (ctx) {
331
1
                LZ4F_freeCompressionContext(ctx);
332
1
            }
333
1
            buffer.reset();
334
1
        }
335
    };
336
    class DContext {
337
        ENABLE_FACTORY_CREATOR(DContext);
338
339
    public:
340
6
        DContext() : ctx(nullptr) {}
341
        LZ4F_decompressionContext_t ctx;
342
6
        ~DContext() {
343
6
            if (ctx) {
344
6
                LZ4F_freeDecompressionContext(ctx);
345
6
            }
346
6
        }
347
    };
348
349
public:
350
31.1k
    static Lz4fBlockCompression* instance() {
351
31.1k
        static Lz4fBlockCompression s_instance;
352
31.1k
        return &s_instance;
353
31.1k
    }
354
1
    ~Lz4fBlockCompression() {
355
1
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
356
1
                ExecEnv::GetInstance()->block_compression_mem_tracker());
357
1
        _ctx_c_pool.clear();
358
1
        _ctx_d_pool.clear();
359
1
    }
360
361
5
    Status compress(const Slice& input, faststring* output) override {
362
5
        std::vector<Slice> inputs {input};
363
5
        return compress(inputs, input.size, output);
364
5
    }
365
366
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
367
25.6k
                    faststring* output) override {
368
25.6k
        return _compress(inputs, uncompressed_size, output);
369
25.6k
    }
370
371
8.51k
    Status decompress(const Slice& input, Slice* output) override {
372
8.51k
        return _decompress(input, output);
373
8.51k
    }
374
375
25.6k
    size_t max_compressed_len(size_t len) override {
376
25.6k
        return std::max(LZ4F_compressBound(len, &_s_preferences),
377
25.6k
                        LZ4F_compressFrameBound(len, &_s_preferences));
378
25.6k
    }
379
380
private:
381
    Status _compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
382
25.6k
                     faststring* output) {
383
25.6k
        std::unique_ptr<CContext> context;
384
25.6k
        RETURN_IF_ERROR(_acquire_compression_ctx(context));
385
25.6k
        bool compress_failed = false;
386
25.6k
        Defer defer {[&] {
387
25.6k
            if (!compress_failed) {
388
25.6k
                _release_compression_ctx(std::move(context));
389
25.6k
            }
390
25.6k
        }};
391
392
25.6k
        try {
393
25.6k
            Slice compressed_buf;
394
25.6k
            size_t max_len = max_compressed_len(uncompressed_size);
395
25.6k
            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
396
                // use output directly
397
0
                output->resize(max_len);
398
0
                compressed_buf.data = reinterpret_cast<char*>(output->data());
399
0
                compressed_buf.size = max_len;
400
25.6k
            } else {
401
25.6k
                {
402
25.6k
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
403
25.6k
                            ExecEnv::GetInstance()->block_compression_mem_tracker());
404
                    // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
405
25.6k
                    context->buffer->resize(max_len);
406
25.6k
                }
407
25.6k
                compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
408
25.6k
                compressed_buf.size = max_len;
409
25.6k
            }
410
411
25.6k
            auto wbytes = LZ4F_compressBegin(context->ctx, compressed_buf.data, compressed_buf.size,
412
25.6k
                                             &_s_preferences);
413
25.6k
            if (LZ4F_isError(wbytes)) {
414
0
                compress_failed = true;
415
0
                return Status::InvalidArgument("Fail to do LZ4F compress begin, res={}",
416
0
                                               LZ4F_getErrorName(wbytes));
417
0
            }
418
25.6k
            size_t offset = wbytes;
419
26.2k
            for (auto input : inputs) {
420
26.2k
                wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data + offset,
421
26.2k
                                             compressed_buf.size - offset, input.data, input.size,
422
26.2k
                                             nullptr);
423
26.2k
                if (LZ4F_isError(wbytes)) {
424
0
                    compress_failed = true;
425
0
                    return Status::InvalidArgument("Fail to do LZ4F compress update, res={}",
426
0
                                                   LZ4F_getErrorName(wbytes));
427
0
                }
428
26.2k
                offset += wbytes;
429
26.2k
            }
430
25.6k
            wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + offset,
431
25.6k
                                      compressed_buf.size - offset, nullptr);
432
25.6k
            if (LZ4F_isError(wbytes)) {
433
0
                compress_failed = true;
434
0
                return Status::InvalidArgument("Fail to do LZ4F compress end, res={}",
435
0
                                               LZ4F_getErrorName(wbytes));
436
0
            }
437
25.6k
            offset += wbytes;
438
25.6k
            output->resize(offset);
439
25.6k
            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
440
25.6k
                output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), offset);
441
25.6k
            }
442
25.6k
        } catch (...) {
443
            // Do not set compress_failed to release context
444
0
            DCHECK(!compress_failed);
445
0
            return Status::InternalError("Fail to do LZ4F compress due to exception");
446
0
        }
447
448
25.6k
        return Status::OK();
449
25.6k
    }
450
451
8.51k
    Status _decompress(const Slice& input, Slice* output) {
452
8.51k
        bool decompress_failed = false;
453
8.51k
        std::unique_ptr<DContext> context;
454
8.51k
        RETURN_IF_ERROR(_acquire_decompression_ctx(context));
455
8.51k
        Defer defer {[&] {
456
8.51k
            if (!decompress_failed) {
457
8.50k
                _release_decompression_ctx(std::move(context));
458
8.50k
            }
459
8.51k
        }};
460
8.51k
        size_t input_size = input.size;
461
8.51k
        auto lres = LZ4F_decompress(context->ctx, output->data, &output->size, input.data,
462
8.51k
                                    &input_size, nullptr);
463
8.51k
        if (LZ4F_isError(lres)) {
464
0
            decompress_failed = true;
465
0
            return Status::InternalError("Fail to do LZ4F decompress, res={}",
466
0
                                         LZ4F_getErrorName(lres));
467
8.51k
        } else if (input_size != input.size) {
468
0
            decompress_failed = true;
469
0
            return Status::InvalidArgument(
470
0
                    absl::Substitute("Fail to do LZ4F decompress: trailing data left in "
471
0
                                     "compressed data, read=$0 vs given=$1",
472
0
                                     input_size, input.size));
473
8.51k
        } else if (lres != 0) {
474
5
            decompress_failed = true;
475
5
            return Status::InvalidArgument(
476
5
                    "Fail to do LZ4F decompress: expect more compressed data, expect={}", lres);
477
5
        }
478
8.50k
        return Status::OK();
479
8.51k
    }
480
481
private:
482
    // acquire a compression ctx from pool, release while finish compress,
483
    // delete if compression failed
484
25.6k
    Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) {
485
25.6k
        std::lock_guard<std::mutex> l(_ctx_c_mutex);
486
25.6k
        if (_ctx_c_pool.empty()) {
487
1
            std::unique_ptr<CContext> localCtx = CContext::create_unique();
488
1
            if (localCtx.get() == nullptr) {
489
0
                return Status::InvalidArgument("failed to new LZ4F CContext");
490
0
            }
491
1
            auto res = LZ4F_createCompressionContext(&localCtx->ctx, LZ4F_VERSION);
492
1
            if (LZ4F_isError(res) != 0) {
493
0
                return Status::InvalidArgument(absl::Substitute(
494
0
                        "LZ4F_createCompressionContext error, res=$0", LZ4F_getErrorName(res)));
495
0
            }
496
1
            out = std::move(localCtx);
497
1
            return Status::OK();
498
1
        }
499
25.6k
        out = std::move(_ctx_c_pool.back());
500
25.6k
        _ctx_c_pool.pop_back();
501
25.6k
        return Status::OK();
502
25.6k
    }
503
25.6k
    void _release_compression_ctx(std::unique_ptr<CContext> context) {
504
25.6k
        DCHECK(context);
505
25.6k
        std::lock_guard<std::mutex> l(_ctx_c_mutex);
506
25.6k
        _ctx_c_pool.push_back(std::move(context));
507
25.6k
    }
508
509
8.51k
    Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) {
510
8.51k
        std::lock_guard<std::mutex> l(_ctx_d_mutex);
511
8.51k
        if (_ctx_d_pool.empty()) {
512
6
            std::unique_ptr<DContext> localCtx = DContext::create_unique();
513
6
            if (localCtx.get() == nullptr) {
514
0
                return Status::InvalidArgument("failed to new LZ4F DContext");
515
0
            }
516
6
            auto res = LZ4F_createDecompressionContext(&localCtx->ctx, LZ4F_VERSION);
517
6
            if (LZ4F_isError(res) != 0) {
518
0
                return Status::InvalidArgument(absl::Substitute(
519
0
                        "LZ4F_createDeompressionContext error, res=$0", LZ4F_getErrorName(res)));
520
0
            }
521
6
            out = std::move(localCtx);
522
6
            return Status::OK();
523
6
        }
524
8.50k
        out = std::move(_ctx_d_pool.back());
525
8.50k
        _ctx_d_pool.pop_back();
526
8.50k
        return Status::OK();
527
8.51k
    }
528
8.50k
    void _release_decompression_ctx(std::unique_ptr<DContext> context) {
529
8.50k
        DCHECK(context);
530
        // reset decompression context to avoid ERROR_maxBlockSize_invalid
531
8.50k
        LZ4F_resetDecompressionContext(context->ctx);
532
8.50k
        std::lock_guard<std::mutex> l(_ctx_d_mutex);
533
8.50k
        _ctx_d_pool.push_back(std::move(context));
534
8.50k
    }
535
536
private:
537
    static LZ4F_preferences_t _s_preferences;
538
539
    std::mutex _ctx_c_mutex;
540
    // LZ4F_compressionContext_t is a pointer so no copy here
541
    std::vector<std::unique_ptr<CContext>> _ctx_c_pool;
542
543
    std::mutex _ctx_d_mutex;
544
    std::vector<std::unique_ptr<DContext>> _ctx_d_pool;
545
};
546
547
LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = {
548
        {LZ4F_max256KB, LZ4F_blockLinked, LZ4F_noContentChecksum, LZ4F_frame, 0ULL, 0U,
549
         LZ4F_noBlockChecksum},
550
        0,
551
        0u,
552
        0u,
553
        {0u, 0u, 0u}};
554
555
class Lz4HCBlockCompression : public BlockCompressionCodec {
556
private:
557
    class Context {
558
        ENABLE_FACTORY_CREATOR(Context);
559
560
    public:
561
1
        Context() : ctx(nullptr) {
562
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
563
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
564
1
            buffer = std::make_unique<faststring>();
565
1
        }
566
        LZ4_streamHC_t* ctx;
567
        std::unique_ptr<faststring> buffer;
568
1
        ~Context() {
569
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
570
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
571
1
            if (ctx) {
572
1
                LZ4_freeStreamHC(ctx);
573
1
            }
574
1
            buffer.reset();
575
1
        }
576
    };
577
578
public:
579
2
    static Lz4HCBlockCompression* instance() {
580
2
        static Lz4HCBlockCompression s_instance;
581
2
        return &s_instance;
582
2
    }
583
1
    ~Lz4HCBlockCompression() {
584
1
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
585
1
                ExecEnv::GetInstance()->block_compression_mem_tracker());
586
1
        _ctx_pool.clear();
587
1
    }
588
589
6
    Status compress(const Slice& input, faststring* output) override {
590
6
        std::unique_ptr<Context> context;
591
6
        RETURN_IF_ERROR(_acquire_compression_ctx(context));
592
6
        bool compress_failed = false;
593
6
        Defer defer {[&] {
594
6
            if (!compress_failed) {
595
6
                _release_compression_ctx(std::move(context));
596
6
            }
597
6
        }};
598
599
6
        try {
600
6
            Slice compressed_buf;
601
6
            size_t max_len = max_compressed_len(input.size);
602
6
            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
603
                // use output directly
604
0
                output->resize(max_len);
605
0
                compressed_buf.data = reinterpret_cast<char*>(output->data());
606
0
                compressed_buf.size = max_len;
607
6
            } else {
608
6
                {
609
6
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
610
6
                            ExecEnv::GetInstance()->block_compression_mem_tracker());
611
                    // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
612
6
                    context->buffer->resize(max_len);
613
6
                }
614
6
                compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
615
6
                compressed_buf.size = max_len;
616
6
            }
617
618
6
            size_t compressed_len = LZ4_compress_HC_continue(
619
6
                    context->ctx, input.data, compressed_buf.data, cast_set<int>(input.size),
620
6
                    static_cast<int>(compressed_buf.size));
621
6
            if (compressed_len == 0) {
622
0
                compress_failed = true;
623
0
                return Status::InvalidArgument("Output buffer's capacity is not enough, size={}",
624
0
                                               compressed_buf.size);
625
0
            }
626
6
            output->resize(compressed_len);
627
6
            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
628
6
                output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data),
629
6
                                    compressed_len);
630
6
            }
631
6
        } catch (...) {
632
            // Do not set compress_failed to release context
633
0
            DCHECK(!compress_failed);
634
0
            return Status::InternalError("Fail to do LZ4HC compress due to exception");
635
0
        }
636
6
        return Status::OK();
637
6
    }
638
639
11
    Status decompress(const Slice& input, Slice* output) override {
640
11
        auto decompressed_len = LZ4_decompress_safe(
641
11
                input.data, output->data, cast_set<int>(input.size), cast_set<int>(output->size));
642
11
        if (decompressed_len < 0) {
643
5
            return Status::InvalidArgument(
644
5
                    "destination buffer is not large enough or the source stream is detected "
645
5
                    "malformed, fail to do LZ4 decompress, error={}",
646
5
                    decompressed_len);
647
5
        }
648
6
        output->size = decompressed_len;
649
6
        return Status::OK();
650
11
    }
651
652
6
    size_t max_compressed_len(size_t len) override { return LZ4_compressBound(cast_set<int>(len)); }
653
654
private:
655
6
    Status _acquire_compression_ctx(std::unique_ptr<Context>& out) {
656
6
        std::lock_guard<std::mutex> l(_ctx_mutex);
657
6
        if (_ctx_pool.empty()) {
658
1
            std::unique_ptr<Context> localCtx = Context::create_unique();
659
1
            if (localCtx.get() == nullptr) {
660
0
                return Status::InvalidArgument("new LZ4HC context error");
661
0
            }
662
1
            localCtx->ctx = LZ4_createStreamHC();
663
1
            if (localCtx->ctx == nullptr) {
664
0
                return Status::InvalidArgument("LZ4_createStreamHC error");
665
0
            }
666
1
            out = std::move(localCtx);
667
1
            return Status::OK();
668
1
        }
669
5
        out = std::move(_ctx_pool.back());
670
5
        _ctx_pool.pop_back();
671
5
        return Status::OK();
672
6
    }
673
6
    void _release_compression_ctx(std::unique_ptr<Context> context) {
674
6
        DCHECK(context);
675
6
        LZ4_resetStreamHC_fast(context->ctx, static_cast<int>(_compression_level));
676
6
        std::lock_guard<std::mutex> l(_ctx_mutex);
677
6
        _ctx_pool.push_back(std::move(context));
678
6
    }
679
680
private:
681
    int64_t _compression_level = config::LZ4_HC_compression_level;
682
    mutable std::mutex _ctx_mutex;
683
    mutable std::vector<std::unique_ptr<Context>> _ctx_pool;
684
};
685
686
class SnappySlicesSource : public snappy::Source {
687
public:
688
    SnappySlicesSource(const std::vector<Slice>& slices)
689
5
            : _available(0), _cur_slice(0), _slice_off(0) {
690
9
        for (auto& slice : slices) {
691
            // We filter empty slice here to avoid complicated process
692
9
            if (slice.size == 0) {
693
1
                continue;
694
1
            }
695
8
            _available += slice.size;
696
8
            _slices.push_back(slice);
697
8
        }
698
5
    }
699
5
    ~SnappySlicesSource() override {}
700
701
    // Return the number of bytes left to read from the source
702
5
    size_t Available() const override { return _available; }
703
704
    // Peek at the next flat region of the source.  Does not reposition
705
    // the source.  The returned region is empty iff Available()==0.
706
    //
707
    // Returns a pointer to the beginning of the region and store its
708
    // length in *len.
709
    //
710
    // The returned region is valid until the next call to Skip() or
711
    // until this object is destroyed, whichever occurs first.
712
    //
713
    // The returned region may be larger than Available() (for example
714
    // if this ByteSource is a view on a substring of a larger source).
715
    // The caller is responsible for ensuring that it only reads the
716
    // Available() bytes.
717
23
    const char* Peek(size_t* len) override {
718
23
        if (_available == 0) {
719
0
            *len = 0;
720
0
            return nullptr;
721
0
        }
722
        // we should assure that *len is not 0
723
23
        *len = _slices[_cur_slice].size - _slice_off;
724
23
        DCHECK(*len != 0);
725
23
        return _slices[_cur_slice].data + _slice_off;
726
23
    }
727
728
    // Skip the next n bytes.  Invalidates any buffer returned by
729
    // a previous call to Peek().
730
    // REQUIRES: Available() >= n
731
24
    void Skip(size_t n) override {
732
24
        _available -= n;
733
32
        while (n > 0) {
734
23
            auto left = _slices[_cur_slice].size - _slice_off;
735
23
            if (left > n) {
736
                // n can be digest in current slice
737
15
                _slice_off += n;
738
15
                return;
739
15
            }
740
8
            _slice_off = 0;
741
8
            _cur_slice++;
742
8
            n -= left;
743
8
        }
744
24
    }
745
746
private:
747
    std::vector<Slice> _slices;
748
    size_t _available;
749
    size_t _cur_slice;
750
    size_t _slice_off;
751
};
752
753
class SnappyBlockCompression : public BlockCompressionCodec {
754
public:
755
168
    static SnappyBlockCompression* instance() {
756
168
        static SnappyBlockCompression s_instance;
757
168
        return &s_instance;
758
168
    }
759
1
    ~SnappyBlockCompression() override = default;
760
761
34
    Status compress(const Slice& input, faststring* output) override {
762
34
        size_t max_len = max_compressed_len(input.size);
763
34
        output->resize(max_len);
764
34
        Slice s(*output);
765
766
34
        snappy::RawCompress(input.data, input.size, s.data, &s.size);
767
34
        output->resize(s.size);
768
34
        return Status::OK();
769
34
    }
770
771
79
    Status decompress(const Slice& input, Slice* output) override {
772
79
        if (!snappy::RawUncompress(input.data, input.size, output->data)) {
773
0
            return Status::InvalidArgument("Fail to do Snappy decompress");
774
0
        }
775
        // NOTE: GetUncompressedLength only takes O(1) time
776
79
        snappy::GetUncompressedLength(input.data, input.size, &output->size);
777
79
        return Status::OK();
778
79
    }
779
780
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
781
5
                    faststring* output) override {
782
5
        auto max_len = max_compressed_len(uncompressed_size);
783
5
        output->resize(max_len);
784
785
5
        SnappySlicesSource source(inputs);
786
5
        snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(output->data()));
787
5
        output->resize(snappy::Compress(&source, &sink));
788
5
        return Status::OK();
789
5
    }
790
791
39
    size_t max_compressed_len(size_t len) override { return snappy::MaxCompressedLength(len); }
792
};
793
794
class HadoopSnappyBlockCompression : public SnappyBlockCompression {
795
public:
796
0
    static HadoopSnappyBlockCompression* instance() {
797
0
        static HadoopSnappyBlockCompression s_instance;
798
0
        return &s_instance;
799
0
    }
800
0
    ~HadoopSnappyBlockCompression() override = default;
801
802
    // hadoop use block compression for snappy
803
    // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
804
0
    Status compress(const Slice& input, faststring* output) override {
805
        // be same with hadop https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java
806
0
        size_t snappy_block_size = config::snappy_compression_block_size;
807
0
        size_t overhead = snappy_block_size / 6 + 32;
808
0
        size_t max_input_size = snappy_block_size - overhead;
809
810
0
        size_t data_len = input.size;
811
0
        char* data = input.data;
812
0
        std::vector<OwnedSlice> buffers;
813
0
        size_t out_len = 0;
814
815
0
        while (data_len > 0) {
816
0
            size_t input_size = std::min(data_len, max_input_size);
817
0
            Slice input_slice(data, input_size);
818
0
            faststring output_data;
819
0
            RETURN_IF_ERROR(SnappyBlockCompression::compress(input_slice, &output_data));
820
0
            out_len += output_data.size();
821
            // the OwnedSlice will be moved here
822
0
            buffers.push_back(output_data.build());
823
0
            data += input_size;
824
0
            data_len -= input_size;
825
0
        }
826
827
        // hadoop block compression: umcompressed_length | compressed_length1 | compressed_data1 | compressed_length2 | compressed_data2 | ...
828
0
        size_t total_output_len = 4 + 4 * buffers.size() + out_len;
829
0
        output->resize(total_output_len);
830
0
        char* output_buffer = (char*)output->data();
831
0
        BigEndian::Store32(output_buffer, cast_set<uint32_t>(input.get_size()));
832
0
        output_buffer += 4;
833
0
        for (const auto& buffer : buffers) {
834
0
            auto slice = buffer.slice();
835
0
            BigEndian::Store32(output_buffer, cast_set<uint32_t>(slice.get_size()));
836
0
            output_buffer += 4;
837
0
            memcpy(output_buffer, slice.get_data(), slice.get_size());
838
0
            output_buffer += slice.get_size();
839
0
        }
840
841
0
        DCHECK_EQ(output_buffer - (char*)output->data(), total_output_len);
842
843
0
        return Status::OK();
844
0
    }
845
846
0
    Status decompress(const Slice& input, Slice* output) override {
847
0
        return Status::InternalError("unimplement: SnappyHadoopBlockCompression::decompress");
848
0
    }
849
};
850
851
class ZlibBlockCompression : public BlockCompressionCodec {
852
public:
853
2
    static ZlibBlockCompression* instance() {
854
2
        static ZlibBlockCompression s_instance;
855
2
        return &s_instance;
856
2
    }
857
1
    ~ZlibBlockCompression() override = default;
858
859
5
    Status compress(const Slice& input, faststring* output) override {
860
5
        size_t max_len = max_compressed_len(input.size);
861
5
        output->resize(max_len);
862
5
        Slice s(*output);
863
864
5
        auto zres = ::compress((Bytef*)s.data, &s.size, (Bytef*)input.data, input.size);
865
5
        if (zres == Z_MEM_ERROR) {
866
0
            throw Exception(Status::MemoryLimitExceeded(fmt::format(
867
0
                    "ZLib compression failed due to memory allocationerror.error = {}, res = {} ",
868
0
                    zError(zres), zres)));
869
5
        } else if (zres != Z_OK) {
870
0
            return Status::InternalError("Fail to do Zlib compress, error={}", zError(zres));
871
0
        }
872
5
        output->resize(s.size);
873
5
        return Status::OK();
874
5
    }
875
876
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
877
1
                    faststring* output) override {
878
1
        size_t max_len = max_compressed_len(uncompressed_size);
879
1
        output->resize(max_len);
880
881
1
        z_stream zstrm;
882
1
        zstrm.zalloc = Z_NULL;
883
1
        zstrm.zfree = Z_NULL;
884
1
        zstrm.opaque = Z_NULL;
885
1
        auto zres = deflateInit(&zstrm, Z_DEFAULT_COMPRESSION);
886
1
        if (zres == Z_MEM_ERROR) {
887
0
            throw Exception(Status::MemoryLimitExceeded(
888
0
                    "Fail to do ZLib stream compress, error={}, res={}", zError(zres), zres));
889
1
        } else if (zres != Z_OK) {
890
0
            return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
891
0
                                         zError(zres), zres);
892
0
        }
893
        // we assume that output is e
894
1
        zstrm.next_out = (Bytef*)output->data();
895
1
        zstrm.avail_out = cast_set<decltype(zstrm.avail_out)>(output->size());
896
6
        for (int i = 0; i < inputs.size(); ++i) {
897
5
            if (inputs[i].size == 0) {
898
1
                continue;
899
1
            }
900
4
            zstrm.next_in = (Bytef*)inputs[i].data;
901
4
            zstrm.avail_in = cast_set<decltype(zstrm.avail_in)>(inputs[i].size);
902
4
            int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH;
903
904
4
            zres = deflate(&zstrm, flush);
905
4
            if (zres != Z_OK && zres != Z_STREAM_END) {
906
0
                return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
907
0
                                             zError(zres), zres);
908
0
            }
909
4
        }
910
911
1
        output->resize(zstrm.total_out);
912
1
        zres = deflateEnd(&zstrm);
913
1
        if (zres == Z_DATA_ERROR) {
914
0
            return Status::InvalidArgument("Fail to do deflateEnd, error={}, res={}", zError(zres),
915
0
                                           zres);
916
1
        } else if (zres != Z_OK) {
917
0
            return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}",
918
0
                                         zError(zres), zres);
919
0
        }
920
1
        return Status::OK();
921
1
    }
922
923
14
    Status decompress(const Slice& input, Slice* output) override {
924
14
        size_t input_size = input.size;
925
14
        auto zres =
926
14
                ::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size);
927
14
        if (zres == Z_DATA_ERROR) {
928
1
            return Status::InvalidArgument("Fail to do ZLib decompress, error={}", zError(zres));
929
13
        } else if (zres == Z_MEM_ERROR) {
930
0
            throw Exception(Status::MemoryLimitExceeded("Fail to do ZLib decompress, error={}",
931
0
                                                        zError(zres)));
932
13
        } else if (zres != Z_OK) {
933
7
            return Status::InternalError("Fail to do ZLib decompress, error={}", zError(zres));
934
7
        }
935
6
        return Status::OK();
936
14
    }
937
938
6
    size_t max_compressed_len(size_t len) override {
939
        // one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block
940
6
        return len + 6 + 5 * ((len >> 14) + 1);
941
6
    }
942
};
943
944
class Bzip2BlockCompression : public BlockCompressionCodec {
945
public:
946
0
    static Bzip2BlockCompression* instance() {
947
0
        static Bzip2BlockCompression s_instance;
948
0
        return &s_instance;
949
0
    }
950
0
    ~Bzip2BlockCompression() override = default;
951
952
0
    Status compress(const Slice& input, faststring* output) override {
953
0
        size_t max_len = max_compressed_len(input.size);
954
0
        output->resize(max_len);
955
0
        auto size = cast_set<uint32_t>(output->size());
956
0
        auto bzres = BZ2_bzBuffToBuffCompress((char*)output->data(), &size, (char*)input.data,
957
0
                                              cast_set<uint32_t>(input.size), 9, 0, 0);
958
0
        if (bzres == BZ_MEM_ERROR) {
959
0
            throw Exception(
960
0
                    Status::MemoryLimitExceeded("Fail to do Bzip2 compress, ret={}", bzres));
961
0
        } else if (bzres == BZ_PARAM_ERROR) {
962
0
            return Status::InvalidArgument("Fail to do Bzip2 compress, ret={}", bzres);
963
0
        } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK &&
964
0
                   bzres != BZ_STREAM_END && bzres != BZ_OK) {
965
0
            return Status::InternalError("Failed to init bz2. status code: {}", bzres);
966
0
        }
967
0
        output->resize(size);
968
0
        return Status::OK();
969
0
    }
970
971
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
972
0
                    faststring* output) override {
973
0
        size_t max_len = max_compressed_len(uncompressed_size);
974
0
        output->resize(max_len);
975
976
0
        bz_stream bzstrm;
977
0
        bzero(&bzstrm, sizeof(bzstrm));
978
0
        int bzres = BZ2_bzCompressInit(&bzstrm, 9, 0, 0);
979
0
        if (bzres == BZ_PARAM_ERROR) {
980
0
            return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres);
981
0
        } else if (bzres == BZ_MEM_ERROR) {
982
0
            throw Exception(
983
0
                    Status::MemoryLimitExceeded("Failed to init bz2. status code: {}", bzres));
984
0
        } else if (bzres != BZ_OK) {
985
0
            return Status::InternalError("Failed to init bz2. status code: {}", bzres);
986
0
        }
987
        // we assume that output is e
988
0
        bzstrm.next_out = (char*)output->data();
989
0
        bzstrm.avail_out = cast_set<uint32_t>(output->size());
990
0
        for (int i = 0; i < inputs.size(); ++i) {
991
0
            if (inputs[i].size == 0) {
992
0
                continue;
993
0
            }
994
0
            bzstrm.next_in = (char*)inputs[i].data;
995
0
            bzstrm.avail_in = cast_set<uint32_t>(inputs[i].size);
996
0
            int flush = (i == (inputs.size() - 1)) ? BZ_FINISH : BZ_RUN;
997
998
0
            bzres = BZ2_bzCompress(&bzstrm, flush);
999
0
            if (bzres == BZ_PARAM_ERROR) {
1000
0
                return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres);
1001
0
            } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK &&
1002
0
                       bzres != BZ_STREAM_END && bzres != BZ_OK) {
1003
0
                return Status::InternalError("Failed to init bz2. status code: {}", bzres);
1004
0
            }
1005
0
        }
1006
1007
0
        size_t total_out = (size_t)bzstrm.total_out_hi32 << 32 | (size_t)bzstrm.total_out_lo32;
1008
0
        output->resize(total_out);
1009
0
        bzres = BZ2_bzCompressEnd(&bzstrm);
1010
0
        if (bzres == BZ_PARAM_ERROR) {
1011
0
            return Status::InvalidArgument("Fail to do deflateEnd on bzip2 stream, res={}", bzres);
1012
0
        } else if (bzres != BZ_OK) {
1013
0
            return Status::InternalError("Fail to do deflateEnd on bzip2 stream, res={}", bzres);
1014
0
        }
1015
0
        return Status::OK();
1016
0
    }
1017
1018
0
    Status decompress(const Slice& input, Slice* output) override {
1019
0
        return Status::InternalError("unimplement: Bzip2BlockCompression::decompress");
1020
0
    }
1021
1022
0
    size_t max_compressed_len(size_t len) override {
1023
        // TODO: make sure the max_compressed_len for bzip2
1024
        // 50 is an estimate fix overhead for bzip2
1025
        // in case the input len is small and BZ2_bzBuffToBuffCompress will return
1026
        // BZ_OUTBUFF_FULL
1027
0
        return len * 2 + 50;
1028
0
    }
1029
};
1030
1031
// for ZSTD compression and decompression, with BOTH fast and high compression ratio
1032
class ZstdBlockCompression : public BlockCompressionCodec {
1033
private:
1034
    class CContext {
1035
        ENABLE_FACTORY_CREATOR(CContext);
1036
1037
    public:
1038
1
        CContext() : ctx(nullptr) {
1039
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
1040
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
1041
1
            buffer = std::make_unique<faststring>();
1042
1
        }
1043
        ZSTD_CCtx* ctx;
1044
        std::unique_ptr<faststring> buffer;
1045
1
        ~CContext() {
1046
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
1047
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
1048
1
            if (ctx) {
1049
1
                ZSTD_freeCCtx(ctx);
1050
1
            }
1051
1
            buffer.reset();
1052
1
        }
1053
    };
1054
    class DContext {
1055
        ENABLE_FACTORY_CREATOR(DContext);
1056
1057
    public:
1058
6
        DContext() : ctx(nullptr) {}
1059
        ZSTD_DCtx* ctx;
1060
6
        ~DContext() {
1061
6
            if (ctx) {
1062
6
                ZSTD_freeDCtx(ctx);
1063
6
            }
1064
6
        }
1065
    };
1066
1067
public:
1068
1.20k
    static ZstdBlockCompression* instance() {
1069
1.20k
        static ZstdBlockCompression s_instance;
1070
1.20k
        return &s_instance;
1071
1.20k
    }
1072
1
    ~ZstdBlockCompression() {
1073
1
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
1074
1
                ExecEnv::GetInstance()->block_compression_mem_tracker());
1075
1
        _ctx_c_pool.clear();
1076
1
        _ctx_d_pool.clear();
1077
1
    }
1078
1079
705
    size_t max_compressed_len(size_t len) override { return ZSTD_compressBound(len); }
1080
1081
554
    Status compress(const Slice& input, faststring* output) override {
1082
554
        std::vector<Slice> inputs {input};
1083
554
        return compress(inputs, input.size, output);
1084
554
    }
1085
1086
    // follow ZSTD official example
1087
    //  https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c
1088
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
1089
705
                    faststring* output) override {
1090
705
        std::unique_ptr<CContext> context;
1091
705
        RETURN_IF_ERROR(_acquire_compression_ctx(context));
1092
705
        bool compress_failed = false;
1093
705
        Defer defer {[&] {
1094
705
            if (!compress_failed) {
1095
705
                _release_compression_ctx(std::move(context));
1096
705
            }
1097
705
        }};
1098
1099
705
        try {
1100
705
            size_t max_len = max_compressed_len(uncompressed_size);
1101
705
            Slice compressed_buf;
1102
705
            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
1103
                // use output directly
1104
4
                output->resize(max_len);
1105
4
                compressed_buf.data = reinterpret_cast<char*>(output->data());
1106
4
                compressed_buf.size = max_len;
1107
701
            } else {
1108
701
                {
1109
701
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
1110
701
                            ExecEnv::GetInstance()->block_compression_mem_tracker());
1111
                    // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
1112
701
                    context->buffer->resize(max_len);
1113
701
                }
1114
701
                compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
1115
701
                compressed_buf.size = max_len;
1116
701
            }
1117
1118
            // set compression level to default 3
1119
705
            auto ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_compressionLevel,
1120
705
                                              ZSTD_CLEVEL_DEFAULT);
1121
705
            if (ZSTD_isError(ret)) {
1122
0
                return Status::InvalidArgument("ZSTD_CCtx_setParameter compression level error: {}",
1123
0
                                               ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
1124
0
            }
1125
            // set checksum flag to 1
1126
705
            ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1);
1127
705
            if (ZSTD_isError(ret)) {
1128
0
                return Status::InvalidArgument("ZSTD_CCtx_setParameter checksumFlag error: {}",
1129
0
                                               ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
1130
0
            }
1131
1132
705
            ZSTD_outBuffer out_buf = {compressed_buf.data, compressed_buf.size, 0};
1133
1134
1.41k
            for (size_t i = 0; i < inputs.size(); i++) {
1135
709
                ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0};
1136
1137
709
                bool last_input = (i == inputs.size() - 1);
1138
709
                auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue;
1139
1140
709
                bool finished = false;
1141
709
                do {
1142
                    // do compress
1143
709
                    ret = ZSTD_compressStream2(context->ctx, &out_buf, &in_buf, mode);
1144
1145
709
                    if (ZSTD_isError(ret)) {
1146
0
                        compress_failed = true;
1147
0
                        return Status::InternalError("ZSTD_compressStream2 error: {}",
1148
0
                                                     ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
1149
0
                    }
1150
1151
                    // ret is ZSTD hint for needed output buffer size
1152
709
                    if (ret > 0 && out_buf.pos == out_buf.size) {
1153
0
                        compress_failed = true;
1154
0
                        return Status::InternalError("ZSTD_compressStream2 output buffer full");
1155
0
                    }
1156
1157
709
                    finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size);
1158
709
                } while (!finished);
1159
709
            }
1160
1161
            // set compressed size for caller
1162
705
            output->resize(out_buf.pos);
1163
705
            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
1164
701
                output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), out_buf.pos);
1165
701
            }
1166
705
        } catch (std::exception& e) {
1167
0
            return Status::InternalError("Fail to do ZSTD compress due to exception {}", e.what());
1168
0
        } catch (...) {
1169
            // Do not set compress_failed to release context
1170
0
            DCHECK(!compress_failed);
1171
0
            return Status::InternalError("Fail to do ZSTD compress due to exception");
1172
0
        }
1173
1174
705
        return Status::OK();
1175
705
    }
1176
1177
578
    Status decompress(const Slice& input, Slice* output) override {
1178
578
        std::unique_ptr<DContext> context;
1179
578
        bool decompress_failed = false;
1180
578
        RETURN_IF_ERROR(_acquire_decompression_ctx(context));
1181
578
        Defer defer {[&] {
1182
578
            if (!decompress_failed) {
1183
573
                _release_decompression_ctx(std::move(context));
1184
573
            }
1185
578
        }};
1186
1187
578
        size_t ret = ZSTD_decompressDCtx(context->ctx, output->data, output->size, input.data,
1188
578
                                         input.size);
1189
578
        if (ZSTD_isError(ret)) {
1190
5
            decompress_failed = true;
1191
5
            return Status::InternalError("ZSTD_decompressDCtx error: {}",
1192
5
                                         ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
1193
5
        }
1194
1195
        // set decompressed size for caller
1196
573
        output->size = ret;
1197
1198
573
        return Status::OK();
1199
578
    }
1200
1201
private:
1202
705
    Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) {
1203
705
        std::lock_guard<std::mutex> l(_ctx_c_mutex);
1204
705
        if (_ctx_c_pool.empty()) {
1205
1
            std::unique_ptr<CContext> localCtx = CContext::create_unique();
1206
1
            if (localCtx.get() == nullptr) {
1207
0
                return Status::InvalidArgument("failed to new ZSTD CContext");
1208
0
            }
1209
            //typedef LZ4F_cctx* LZ4F_compressionContext_t;
1210
1
            localCtx->ctx = ZSTD_createCCtx();
1211
1
            if (localCtx->ctx == nullptr) {
1212
0
                return Status::InvalidArgument("Failed to create ZSTD compress ctx");
1213
0
            }
1214
1
            out = std::move(localCtx);
1215
1
            return Status::OK();
1216
1
        }
1217
704
        out = std::move(_ctx_c_pool.back());
1218
704
        _ctx_c_pool.pop_back();
1219
704
        return Status::OK();
1220
705
    }
1221
705
    void _release_compression_ctx(std::unique_ptr<CContext> context) {
1222
705
        DCHECK(context);
1223
705
        auto ret = ZSTD_CCtx_reset(context->ctx, ZSTD_reset_session_only);
1224
705
        DCHECK(!ZSTD_isError(ret));
1225
705
        std::lock_guard<std::mutex> l(_ctx_c_mutex);
1226
705
        _ctx_c_pool.push_back(std::move(context));
1227
705
    }
1228
1229
578
    Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) {
1230
578
        std::lock_guard<std::mutex> l(_ctx_d_mutex);
1231
578
        if (_ctx_d_pool.empty()) {
1232
6
            std::unique_ptr<DContext> localCtx = DContext::create_unique();
1233
6
            if (localCtx.get() == nullptr) {
1234
0
                return Status::InvalidArgument("failed to new ZSTD DContext");
1235
0
            }
1236
6
            localCtx->ctx = ZSTD_createDCtx();
1237
6
            if (localCtx->ctx == nullptr) {
1238
0
                return Status::InvalidArgument("Fail to init ZSTD decompress context");
1239
0
            }
1240
6
            out = std::move(localCtx);
1241
6
            return Status::OK();
1242
6
        }
1243
572
        out = std::move(_ctx_d_pool.back());
1244
572
        _ctx_d_pool.pop_back();
1245
572
        return Status::OK();
1246
578
    }
1247
573
    void _release_decompression_ctx(std::unique_ptr<DContext> context) {
1248
573
        DCHECK(context);
1249
        // reset ctx to start a new decompress session
1250
573
        auto ret = ZSTD_DCtx_reset(context->ctx, ZSTD_reset_session_only);
1251
573
        DCHECK(!ZSTD_isError(ret));
1252
573
        std::lock_guard<std::mutex> l(_ctx_d_mutex);
1253
573
        _ctx_d_pool.push_back(std::move(context));
1254
573
    }
1255
1256
private:
1257
    mutable std::mutex _ctx_c_mutex;
1258
    mutable std::vector<std::unique_ptr<CContext>> _ctx_c_pool;
1259
1260
    mutable std::mutex _ctx_d_mutex;
1261
    mutable std::vector<std::unique_ptr<DContext>> _ctx_d_pool;
1262
};
1263
1264
class GzipBlockCompression : public ZlibBlockCompression {
1265
public:
1266
0
    static GzipBlockCompression* instance() {
1267
0
        static GzipBlockCompression s_instance;
1268
0
        return &s_instance;
1269
0
    }
1270
0
    ~GzipBlockCompression() override = default;
1271
1272
0
    Status compress(const Slice& input, faststring* output) override {
1273
0
        size_t max_len = max_compressed_len(input.size);
1274
0
        output->resize(max_len);
1275
1276
0
        z_stream z_strm = {};
1277
0
        z_strm.zalloc = Z_NULL;
1278
0
        z_strm.zfree = Z_NULL;
1279
0
        z_strm.opaque = Z_NULL;
1280
1281
0
        int zres = deflateInit2(&z_strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC,
1282
0
                                8, Z_DEFAULT_STRATEGY);
1283
1284
0
        if (zres == Z_MEM_ERROR) {
1285
0
            throw Exception(Status::MemoryLimitExceeded(
1286
0
                    "Fail to init ZLib compress, error={}, res={}", zError(zres), zres));
1287
0
        } else if (zres != Z_OK) {
1288
0
            return Status::InternalError("Fail to init ZLib compress, error={}, res={}",
1289
0
                                         zError(zres), zres);
1290
0
        }
1291
1292
0
        z_strm.next_in = (Bytef*)input.get_data();
1293
0
        z_strm.avail_in = cast_set<decltype(z_strm.avail_in)>(input.get_size());
1294
0
        z_strm.next_out = (Bytef*)output->data();
1295
0
        z_strm.avail_out = cast_set<decltype(z_strm.avail_out)>(output->size());
1296
1297
0
        zres = deflate(&z_strm, Z_FINISH);
1298
0
        if (zres != Z_OK && zres != Z_STREAM_END) {
1299
0
            return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
1300
0
                                         zError(zres), zres);
1301
0
        }
1302
1303
0
        output->resize(z_strm.total_out);
1304
0
        zres = deflateEnd(&z_strm);
1305
0
        if (zres == Z_DATA_ERROR) {
1306
0
            return Status::InvalidArgument("Fail to end zlib compress");
1307
0
        } else if (zres != Z_OK) {
1308
0
            return Status::InternalError("Fail to end zlib compress");
1309
0
        }
1310
0
        return Status::OK();
1311
0
    }
1312
1313
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
1314
0
                    faststring* output) override {
1315
0
        size_t max_len = max_compressed_len(uncompressed_size);
1316
0
        output->resize(max_len);
1317
1318
0
        z_stream zstrm;
1319
0
        zstrm.zalloc = Z_NULL;
1320
0
        zstrm.zfree = Z_NULL;
1321
0
        zstrm.opaque = Z_NULL;
1322
0
        auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC,
1323
0
                                 8, Z_DEFAULT_STRATEGY);
1324
0
        if (zres == Z_MEM_ERROR) {
1325
0
            throw Exception(Status::MemoryLimitExceeded(
1326
0
                    "Fail to init ZLib stream compress, error={}, res={}", zError(zres), zres));
1327
0
        } else if (zres != Z_OK) {
1328
0
            return Status::InternalError("Fail to init ZLib stream compress, error={}, res={}",
1329
0
                                         zError(zres), zres);
1330
0
        }
1331
1332
        // we assume that output is e
1333
0
        zstrm.next_out = (Bytef*)output->data();
1334
0
        zstrm.avail_out = cast_set<decltype(zstrm.avail_out)>(output->size());
1335
0
        for (int i = 0; i < inputs.size(); ++i) {
1336
0
            if (inputs[i].size == 0) {
1337
0
                continue;
1338
0
            }
1339
0
            zstrm.next_in = (Bytef*)inputs[i].data;
1340
0
            zstrm.avail_in = cast_set<decltype(zstrm.avail_in)>(inputs[i].size);
1341
0
            int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH;
1342
1343
0
            zres = deflate(&zstrm, flush);
1344
0
            if (zres != Z_OK && zres != Z_STREAM_END) {
1345
0
                return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
1346
0
                                             zError(zres), zres);
1347
0
            }
1348
0
        }
1349
1350
0
        output->resize(zstrm.total_out);
1351
0
        zres = deflateEnd(&zstrm);
1352
0
        if (zres == Z_DATA_ERROR) {
1353
0
            return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}",
1354
0
                                           zError(zres), zres);
1355
0
        } else if (zres != Z_OK) {
1356
0
            return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}",
1357
0
                                         zError(zres), zres);
1358
0
        }
1359
0
        return Status::OK();
1360
0
    }
1361
1362
0
    Status decompress(const Slice& input, Slice* output) override {
1363
0
        z_stream z_strm = {};
1364
0
        z_strm.zalloc = Z_NULL;
1365
0
        z_strm.zfree = Z_NULL;
1366
0
        z_strm.opaque = Z_NULL;
1367
1368
0
        int ret = inflateInit2(&z_strm, MAX_WBITS + GZIP_CODEC);
1369
0
        if (ret != Z_OK) {
1370
0
            return Status::InternalError("Fail to init ZLib decompress, error={}, res={}",
1371
0
                                         zError(ret), ret);
1372
0
        }
1373
1374
        // 1. set input and output
1375
0
        z_strm.next_in = reinterpret_cast<Bytef*>(input.data);
1376
0
        z_strm.avail_in = cast_set<decltype(z_strm.avail_in)>(input.size);
1377
0
        z_strm.next_out = reinterpret_cast<Bytef*>(output->data);
1378
0
        z_strm.avail_out = cast_set<decltype(z_strm.avail_out)>(output->size);
1379
1380
0
        if (z_strm.avail_out > 0) {
1381
            // We only support non-streaming use case  for block decompressor
1382
0
            ret = inflate(&z_strm, Z_FINISH);
1383
0
            if (ret != Z_OK && ret != Z_STREAM_END) {
1384
0
                (void)inflateEnd(&z_strm);
1385
0
                if (ret == Z_MEM_ERROR) {
1386
0
                    throw Exception(Status::MemoryLimitExceeded(
1387
0
                            "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret));
1388
0
                } else if (ret == Z_DATA_ERROR) {
1389
0
                    return Status::InvalidArgument(
1390
0
                            "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret);
1391
0
                }
1392
0
                return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
1393
0
                                             zError(ret), ret);
1394
0
            }
1395
0
        }
1396
0
        (void)inflateEnd(&z_strm);
1397
1398
0
        return Status::OK();
1399
0
    }
1400
1401
0
    size_t max_compressed_len(size_t len) override {
1402
0
        z_stream zstrm;
1403
0
        zstrm.zalloc = Z_NULL;
1404
0
        zstrm.zfree = Z_NULL;
1405
0
        zstrm.opaque = Z_NULL;
1406
0
        auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC,
1407
0
                                 MEM_LEVEL, Z_DEFAULT_STRATEGY);
1408
0
        if (zres != Z_OK) {
1409
            // Fall back to zlib estimate logic for deflate, notice this may
1410
            // cause decompress error
1411
0
            LOG(WARNING) << "Fail to do ZLib stream compress, error=" << zError(zres)
1412
0
                         << ", res=" << zres;
1413
0
            return ZlibBlockCompression::max_compressed_len(len);
1414
0
        } else {
1415
0
            zres = deflateEnd(&zstrm);
1416
0
            if (zres != Z_OK) {
1417
0
                LOG(WARNING) << "Fail to do deflateEnd on ZLib stream, error=" << zError(zres)
1418
0
                             << ", res=" << zres;
1419
0
            }
1420
            // Mark, maintainer of zlib, has stated that 12 needs to be added to
1421
            // result for gzip
1422
            // http://compgroups.net/comp.unix.programmer/gzip-compressing-an-in-memory-string-usi/54854
1423
            // To have a safe upper bound for "wrapper variations", we add 32 to
1424
            // estimate
1425
0
            auto upper_bound = deflateBound(&zstrm, len) + 32;
1426
0
            return upper_bound;
1427
0
        }
1428
0
    }
1429
1430
private:
1431
    // Magic number for zlib, see https://zlib.net/manual.html for more details.
1432
    const static int GZIP_CODEC = 16; // gzip
1433
    // The memLevel parameter specifies how much memory should be allocated for
1434
    // the internal compression state.
1435
    const static int MEM_LEVEL = 8;
1436
};
1437
1438
// Only used on x86 or x86_64
1439
#if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \
1440
        defined(__i386) || defined(_M_IX86)
1441
class GzipBlockCompressionByLibdeflate final : public GzipBlockCompression {
1442
public:
1443
0
    GzipBlockCompressionByLibdeflate() : GzipBlockCompression() {}
1444
0
    static GzipBlockCompressionByLibdeflate* instance() {
1445
0
        static GzipBlockCompressionByLibdeflate s_instance;
1446
0
        return &s_instance;
1447
0
    }
1448
0
    ~GzipBlockCompressionByLibdeflate() override = default;
1449
1450
0
    Status decompress(const Slice& input, Slice* output) override {
1451
0
        if (input.empty()) {
1452
0
            output->size = 0;
1453
0
            return Status::OK();
1454
0
        }
1455
0
        thread_local std::unique_ptr<libdeflate_decompressor, void (*)(libdeflate_decompressor*)>
1456
0
                decompressor {libdeflate_alloc_decompressor(), libdeflate_free_decompressor};
1457
0
        if (!decompressor) {
1458
0
            return Status::InternalError("libdeflate_alloc_decompressor error.");
1459
0
        }
1460
0
        std::size_t out_len;
1461
0
        auto result = libdeflate_gzip_decompress(decompressor.get(), input.data, input.size,
1462
0
                                                 output->data, output->size, &out_len);
1463
0
        if (result != LIBDEFLATE_SUCCESS) {
1464
0
            return Status::InternalError("libdeflate_gzip_decompress error, res={}", result);
1465
0
        }
1466
0
        return Status::OK();
1467
0
    }
1468
};
1469
#endif
1470
1471
class LzoBlockCompression final : public BlockCompressionCodec {
1472
public:
1473
0
    static LzoBlockCompression* instance() {
1474
0
        static LzoBlockCompression s_instance;
1475
0
        return &s_instance;
1476
0
    }
1477
1478
0
    Status compress(const Slice& input, faststring* output) override {
1479
0
        return Status::InvalidArgument("not impl lzo compress.");
1480
0
    }
1481
0
    size_t max_compressed_len(size_t len) override { return 0; };
1482
0
    Status decompress(const Slice& input, Slice* output) override {
1483
0
        auto* input_ptr = input.data;
1484
0
        auto remain_input_size = input.size;
1485
0
        auto* output_ptr = output->data;
1486
0
        auto remain_output_size = output->size;
1487
0
        auto* output_limit = output->data + output->size;
1488
1489
        // Example:
1490
        // OriginData(The original data will be divided into several large data block.) :
1491
        //      large data block1 | large data block2 | large data block3 | ....
1492
        // The large data block will be divided into several small data block.
1493
        // Suppose a large data block is divided into three small blocks:
1494
        // large data block1:            | small block1 | small block2 | small block3 |
1495
        // CompressData:   <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]>
1496
        //
1497
        // A : original length of the current block of large data block.
1498
        // sizeof(A) = 4 bytes.
1499
        // A = length(small block1) + length(small block2) + length(small block3)
1500
        // Bx : length of  small data block bx.
1501
        // sizeof(Bx) = 4 bytes.
1502
        // Bx = length(compress(small blockx))
1503
0
        try {
1504
0
            while (remain_input_size > 0) {
1505
0
                if (remain_input_size < 4) {
1506
0
                    return Status::InvalidArgument(
1507
0
                            "Need more input buffer to get large_block_uncompressed_len.");
1508
0
                }
1509
1510
0
                uint32_t large_block_uncompressed_len = BigEndian::Load32(input_ptr);
1511
0
                input_ptr += 4;
1512
0
                remain_input_size -= 4;
1513
1514
0
                if (remain_output_size < large_block_uncompressed_len) {
1515
0
                    return Status::InvalidArgument(
1516
0
                            "Need more output buffer to get uncompressed data.");
1517
0
                }
1518
1519
0
                while (large_block_uncompressed_len > 0) {
1520
0
                    if (remain_input_size < 4) {
1521
0
                        return Status::InvalidArgument(
1522
0
                                "Need more input buffer to get small_block_compressed_len.");
1523
0
                    }
1524
1525
0
                    uint32_t small_block_compressed_len = BigEndian::Load32(input_ptr);
1526
0
                    input_ptr += 4;
1527
0
                    remain_input_size -= 4;
1528
1529
0
                    if (remain_input_size < small_block_compressed_len) {
1530
0
                        return Status::InvalidArgument(
1531
0
                                "Need more input buffer to decompress small block.");
1532
0
                    }
1533
1534
0
                    auto small_block_uncompressed_len =
1535
0
                            orc::lzoDecompress(input_ptr, input_ptr + small_block_compressed_len,
1536
0
                                               output_ptr, output_limit);
1537
1538
0
                    input_ptr += small_block_compressed_len;
1539
0
                    remain_input_size -= small_block_compressed_len;
1540
1541
0
                    output_ptr += small_block_uncompressed_len;
1542
0
                    large_block_uncompressed_len -= small_block_uncompressed_len;
1543
0
                    remain_output_size -= small_block_uncompressed_len;
1544
0
                }
1545
0
            }
1546
0
        } catch (const orc::ParseError& e) {
1547
            //Prevent be from hanging due to orc::lzoDecompress throw exception
1548
0
            return Status::InternalError("Fail to do LZO decompress, error={}", e.what());
1549
0
        }
1550
0
        return Status::OK();
1551
0
    }
1552
};
1553
1554
class BrotliBlockCompression final : public BlockCompressionCodec {
1555
public:
1556
0
    static BrotliBlockCompression* instance() {
1557
0
        static BrotliBlockCompression s_instance;
1558
0
        return &s_instance;
1559
0
    }
1560
1561
0
    Status compress(const Slice& input, faststring* output) override {
1562
0
        return Status::InvalidArgument("not impl brotli compress.");
1563
0
    }
1564
1565
0
    size_t max_compressed_len(size_t len) override { return 0; };
1566
1567
0
    Status decompress(const Slice& input, Slice* output) override {
1568
        // The size of output buffer is always equal to the umcompressed length.
1569
0
        BrotliDecoderResult result = BrotliDecoderDecompress(
1570
0
                input.get_size(), reinterpret_cast<const uint8_t*>(input.get_data()), &output->size,
1571
0
                reinterpret_cast<uint8_t*>(output->data));
1572
0
        if (result != BROTLI_DECODER_RESULT_SUCCESS) {
1573
0
            return Status::InternalError("Brotli decompression failed, result={}", result);
1574
0
        }
1575
0
        return Status::OK();
1576
0
    }
1577
};
1578
1579
Status get_block_compression_codec(segment_v2::CompressionTypePB type,
1580
33.1k
                                   BlockCompressionCodec** codec) {
1581
33.1k
    switch (type) {
1582
145
    case segment_v2::CompressionTypePB::NO_COMPRESSION:
1583
145
        *codec = nullptr;
1584
145
        break;
1585
51
    case segment_v2::CompressionTypePB::SNAPPY:
1586
51
        *codec = SnappyBlockCompression::instance();
1587
51
        break;
1588
569
    case segment_v2::CompressionTypePB::LZ4:
1589
569
        *codec = Lz4BlockCompression::instance();
1590
569
        break;
1591
31.1k
    case segment_v2::CompressionTypePB::LZ4F:
1592
31.1k
        *codec = Lz4fBlockCompression::instance();
1593
31.1k
        break;
1594
2
    case segment_v2::CompressionTypePB::LZ4HC:
1595
2
        *codec = Lz4HCBlockCompression::instance();
1596
2
        break;
1597
2
    case segment_v2::CompressionTypePB::ZLIB:
1598
2
        *codec = ZlibBlockCompression::instance();
1599
2
        break;
1600
1.16k
    case segment_v2::CompressionTypePB::ZSTD:
1601
1.16k
        *codec = ZstdBlockCompression::instance();
1602
1.16k
        break;
1603
0
    default:
1604
0
        return Status::InternalError("unknown compression type({})", type);
1605
33.1k
    }
1606
1607
33.1k
    return Status::OK();
1608
33.1k
}
1609
1610
// this can only be used in hive text write
1611
0
Status get_block_compression_codec(TFileCompressType::type type, BlockCompressionCodec** codec) {
1612
0
    switch (type) {
1613
0
    case TFileCompressType::PLAIN:
1614
0
        *codec = nullptr;
1615
0
        break;
1616
0
    case TFileCompressType::ZLIB:
1617
0
        *codec = ZlibBlockCompression::instance();
1618
0
        break;
1619
0
    case TFileCompressType::GZ:
1620
0
        *codec = GzipBlockCompression::instance();
1621
0
        break;
1622
0
    case TFileCompressType::BZ2:
1623
0
        *codec = Bzip2BlockCompression::instance();
1624
0
        break;
1625
0
    case TFileCompressType::LZ4BLOCK:
1626
0
        *codec = HadoopLz4BlockCompression::instance();
1627
0
        break;
1628
0
    case TFileCompressType::SNAPPYBLOCK:
1629
0
        *codec = HadoopSnappyBlockCompression::instance();
1630
0
        break;
1631
0
    case TFileCompressType::ZSTD:
1632
0
        *codec = ZstdBlockCompression::instance();
1633
0
        break;
1634
0
    default:
1635
0
        return Status::InternalError("unsupport compression type({}) int hive text", type);
1636
0
    }
1637
1638
0
    return Status::OK();
1639
0
}
1640
1641
Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_codec,
1642
167
                                   BlockCompressionCodec** codec) {
1643
167
    switch (parquet_codec) {
1644
16
    case tparquet::CompressionCodec::UNCOMPRESSED:
1645
16
        *codec = nullptr;
1646
16
        break;
1647
117
    case tparquet::CompressionCodec::SNAPPY:
1648
117
        *codec = SnappyBlockCompression::instance();
1649
117
        break;
1650
0
    case tparquet::CompressionCodec::LZ4_RAW: // we can use LZ4 compression algorithm parse LZ4_RAW
1651
0
    case tparquet::CompressionCodec::LZ4:
1652
0
        *codec = HadoopLz4BlockCompression::instance();
1653
0
        break;
1654
34
    case tparquet::CompressionCodec::ZSTD:
1655
34
        *codec = ZstdBlockCompression::instance();
1656
34
        break;
1657
0
    case tparquet::CompressionCodec::GZIP:
1658
// Only used on x86 or x86_64
1659
0
#if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \
1660
0
        defined(__i386) || defined(_M_IX86)
1661
0
        *codec = GzipBlockCompressionByLibdeflate::instance();
1662
#else
1663
        *codec = GzipBlockCompression::instance();
1664
#endif
1665
0
        break;
1666
0
    case tparquet::CompressionCodec::LZO:
1667
0
        *codec = LzoBlockCompression::instance();
1668
0
        break;
1669
0
    case tparquet::CompressionCodec::BROTLI:
1670
0
        *codec = BrotliBlockCompression::instance();
1671
0
        break;
1672
0
    default:
1673
0
        return Status::InternalError("unknown compression type({})", parquet_codec);
1674
167
    }
1675
1676
167
    return Status::OK();
1677
167
}
1678
1679
} // namespace doris