Coverage Report

Created: 2026-04-10 04:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/util/block_compression.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "util/block_compression.h"
19
20
#include <bzlib.h>
21
#include <gen_cpp/parquet_types.h>
22
#include <gen_cpp/segment_v2.pb.h>
23
#include <glog/logging.h>
24
25
#include <exception>
26
// Only used on x86 or x86_64
27
#if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \
28
        defined(__i386) || defined(_M_IX86)
29
#include <libdeflate.h>
30
#endif
31
#include <brotli/decode.h>
32
#include <glog/log_severity.h>
33
#include <glog/logging.h>
34
#include <lz4/lz4.h>
35
#include <lz4/lz4frame.h>
36
#include <lz4/lz4hc.h>
37
#include <snappy/snappy-sinksource.h>
38
#include <snappy/snappy.h>
39
#include <zconf.h>
40
#include <zlib.h>
41
#include <zstd.h>
42
#include <zstd_errors.h>
43
44
#include <algorithm>
45
#include <cstdint>
46
#include <limits>
47
#include <mutex>
48
#include <orc/Exceptions.hh>
49
#include <ostream>
50
51
#include "absl/strings/substitute.h"
52
#include "common/config.h"
53
#include "common/factory_creator.h"
54
#include "exec/common/endian.h"
55
#include "runtime/thread_context.h"
56
#include "util/decompressor.h"
57
#include "util/defer_op.h"
58
#include "util/faststring.h"
59
60
namespace orc {
61
/**
62
 * Decompress the bytes in to the output buffer.
63
 * @param inputAddress the start of the input
64
 * @param inputLimit one past the last byte of the input
65
 * @param outputAddress the start of the output buffer
66
 * @param outputLimit one past the last byte of the output buffer
67
 * @result the number of bytes decompressed
68
 */
69
uint64_t lzoDecompress(const char* inputAddress, const char* inputLimit, char* outputAddress,
70
                       char* outputLimit);
71
} // namespace orc
72
73
namespace doris {
74
75
// exception safe
76
Status BlockCompressionCodec::compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
77
307
                                       faststring* output) {
78
307
    faststring buf;
79
    // we compute total size to avoid more memory copy
80
307
    buf.reserve(uncompressed_size);
81
366
    for (auto& input : inputs) {
82
366
        buf.append(input.data, input.size);
83
366
    }
84
307
    return compress(buf, output);
85
307
}
86
87
362k
bool BlockCompressionCodec::exceed_max_compress_len(size_t uncompressed_size) {
88
362k
    return uncompressed_size > std::numeric_limits<int32_t>::max();
89
362k
}
90
91
class Lz4BlockCompression : public BlockCompressionCodec {
92
private:
93
    class Context {
94
        ENABLE_FACTORY_CREATOR(Context);
95
96
    public:
97
23
        Context() : ctx(nullptr) {
98
23
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
99
23
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
100
23
            buffer = std::make_unique<faststring>();
101
23
        }
102
        LZ4_stream_t* ctx;
103
        std::unique_ptr<faststring> buffer;
104
5
        ~Context() {
105
5
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
106
5
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
107
5
            if (ctx) {
108
5
                LZ4_freeStream(ctx);
109
5
            }
110
5
            buffer.reset();
111
5
        }
112
    };
113
114
public:
115
70.7k
    static Lz4BlockCompression* instance() {
116
70.7k
        static Lz4BlockCompression s_instance;
117
70.7k
        return &s_instance;
118
70.7k
    }
119
3
    ~Lz4BlockCompression() override {
120
3
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
121
3
                ExecEnv::GetInstance()->block_compression_mem_tracker());
122
3
        _ctx_pool.clear();
123
3
    }
124
125
33.8k
    Status compress(const Slice& input, faststring* output) override {
126
33.8k
        if (input.size > LZ4_MAX_INPUT_SIZE) {
127
0
            return Status::InvalidArgument(
128
0
                    "LZ4 not support those case(input.size>LZ4_MAX_INPUT_SIZE), maybe you should "
129
0
                    "change "
130
0
                    "fragment_transmission_compression_codec to snappy, input.size={}, "
131
0
                    "LZ4_MAX_INPUT_SIZE={}",
132
0
                    input.size, LZ4_MAX_INPUT_SIZE);
133
0
        }
134
135
33.8k
        std::unique_ptr<Context> context;
136
33.8k
        RETURN_IF_ERROR(_acquire_compression_ctx(context));
137
33.8k
        bool compress_failed = false;
138
33.9k
        Defer defer {[&] {
139
33.9k
            if (!compress_failed) {
140
33.9k
                _release_compression_ctx(std::move(context));
141
33.9k
            }
142
33.9k
        }};
143
144
33.8k
        try {
145
33.8k
            Slice compressed_buf;
146
33.8k
            size_t max_len = max_compressed_len(input.size);
147
33.8k
            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
148
                // use output directly
149
1
                output->resize(max_len);
150
1
                compressed_buf.data = reinterpret_cast<char*>(output->data());
151
1
                compressed_buf.size = max_len;
152
33.8k
            } else {
153
                // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
154
33.8k
                {
155
                    // context->buffer is resuable between queries, should accouting to
156
                    // global tracker.
157
33.8k
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
158
33.8k
                            ExecEnv::GetInstance()->block_compression_mem_tracker());
159
33.8k
                    context->buffer->resize(max_len);
160
33.8k
                }
161
33.8k
                compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
162
33.8k
                compressed_buf.size = max_len;
163
33.8k
            }
164
165
            // input.size is aready checked before;
166
            // compressed_buf.size is got from max_compressed_len, which is
167
            // the return value of LZ4_compressBound, so it is safe to cast to int
168
33.8k
            size_t compressed_len = LZ4_compress_fast_continue(
169
33.8k
                    context->ctx, input.data, compressed_buf.data, static_cast<int>(input.size),
170
33.8k
                    static_cast<int>(compressed_buf.size), ACCELARATION);
171
33.8k
            if (compressed_len == 0) {
172
0
                compress_failed = true;
173
0
                return Status::InvalidArgument("Output buffer's capacity is not enough, size={}",
174
0
                                               compressed_buf.size);
175
0
            }
176
33.8k
            output->resize(compressed_len);
177
33.9k
            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
178
33.9k
                output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data),
179
33.9k
                                    compressed_len);
180
33.9k
            }
181
33.8k
        } catch (...) {
182
            // Do not set compress_failed to release context
183
0
            DCHECK(!compress_failed);
184
0
            return Status::InternalError("Fail to do LZ4Block compress due to exception");
185
0
        }
186
33.9k
        return Status::OK();
187
33.8k
    }
188
189
36.8k
    Status decompress(const Slice& input, Slice* output) override {
190
36.8k
        auto decompressed_len = LZ4_decompress_safe(
191
36.8k
                input.data, output->data, cast_set<int>(input.size), cast_set<int>(output->size));
192
36.8k
        if (decompressed_len < 0) {
193
5
            return Status::InternalError("fail to do LZ4 decompress, error={}", decompressed_len);
194
5
        }
195
36.8k
        output->size = decompressed_len;
196
36.8k
        return Status::OK();
197
36.8k
    }
198
199
33.9k
    size_t max_compressed_len(size_t len) override { return LZ4_compressBound(cast_set<int>(len)); }
200
201
private:
202
    // reuse LZ4 compress stream
203
33.8k
    Status _acquire_compression_ctx(std::unique_ptr<Context>& out) {
204
33.8k
        std::lock_guard<std::mutex> l(_ctx_mutex);
205
33.8k
        if (_ctx_pool.empty()) {
206
23
            std::unique_ptr<Context> localCtx = Context::create_unique();
207
23
            if (localCtx.get() == nullptr) {
208
0
                return Status::InvalidArgument("new LZ4 context error");
209
0
            }
210
23
            localCtx->ctx = LZ4_createStream();
211
23
            if (localCtx->ctx == nullptr) {
212
0
                return Status::InvalidArgument("LZ4_createStream error");
213
0
            }
214
23
            out = std::move(localCtx);
215
23
            return Status::OK();
216
23
        }
217
33.8k
        out = std::move(_ctx_pool.back());
218
33.8k
        _ctx_pool.pop_back();
219
33.8k
        return Status::OK();
220
33.8k
    }
221
33.9k
    void _release_compression_ctx(std::unique_ptr<Context> context) {
222
33.9k
        DCHECK(context);
223
33.9k
        LZ4_resetStream(context->ctx);
224
33.9k
        std::lock_guard<std::mutex> l(_ctx_mutex);
225
33.9k
        _ctx_pool.push_back(std::move(context));
226
33.9k
    }
227
228
private:
229
    mutable std::mutex _ctx_mutex;
230
    mutable std::vector<std::unique_ptr<Context>> _ctx_pool;
231
    static const int32_t ACCELARATION = 1;
232
};
233
234
class HadoopLz4BlockCompression : public Lz4BlockCompression {
235
public:
236
1
    HadoopLz4BlockCompression() {
237
1
        Status st = Decompressor::create_decompressor(CompressType::LZ4BLOCK, &_decompressor);
238
1
        if (!st.ok()) {
239
0
            throw Exception(Status::FatalError(
240
0
                    "HadoopLz4BlockCompression construction failed. status = {}", st));
241
0
        }
242
1
    }
243
244
0
    ~HadoopLz4BlockCompression() override = default;
245
246
8
    static HadoopLz4BlockCompression* instance() {
247
8
        static HadoopLz4BlockCompression s_instance;
248
8
        return &s_instance;
249
8
    }
250
251
    // hadoop use block compression for lz4
252
    // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
253
69
    Status compress(const Slice& input, faststring* output) override {
254
        // be same with hadop https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java
255
69
        size_t lz4_block_size = config::lz4_compression_block_size;
256
69
        size_t overhead = lz4_block_size / 255 + 16;
257
69
        size_t max_input_size = lz4_block_size - overhead;
258
259
69
        size_t data_len = input.size;
260
69
        char* data = input.data;
261
69
        std::vector<OwnedSlice> buffers;
262
69
        size_t out_len = 0;
263
264
138
        while (data_len > 0) {
265
69
            size_t input_size = std::min(data_len, max_input_size);
266
69
            Slice input_slice(data, input_size);
267
69
            faststring output_data;
268
69
            RETURN_IF_ERROR(Lz4BlockCompression::compress(input_slice, &output_data));
269
69
            out_len += output_data.size();
270
69
            buffers.push_back(output_data.build());
271
69
            data += input_size;
272
69
            data_len -= input_size;
273
69
        }
274
275
        // hadoop block compression: umcompressed_length | compressed_length1 | compressed_data1 | compressed_length2 | compressed_data2 | ...
276
69
        size_t total_output_len = 4 + 4 * buffers.size() + out_len;
277
69
        output->resize(total_output_len);
278
69
        char* output_buffer = (char*)output->data();
279
69
        BigEndian::Store32(output_buffer, cast_set<uint32_t>(input.get_size()));
280
69
        output_buffer += 4;
281
69
        for (const auto& buffer : buffers) {
282
69
            auto slice = buffer.slice();
283
69
            BigEndian::Store32(output_buffer, cast_set<uint32_t>(slice.get_size()));
284
69
            output_buffer += 4;
285
69
            memcpy(output_buffer, slice.get_data(), slice.get_size());
286
69
            output_buffer += slice.get_size();
287
69
        }
288
289
69
        DCHECK_EQ(output_buffer - (char*)output->data(), total_output_len);
290
291
69
        return Status::OK();
292
69
    }
293
294
8
    Status decompress(const Slice& input, Slice* output) override {
295
8
        size_t input_bytes_read = 0;
296
8
        size_t decompressed_len = 0;
297
8
        size_t more_input_bytes = 0;
298
8
        size_t more_output_bytes = 0;
299
8
        bool stream_end = false;
300
8
        auto st = _decompressor->decompress((uint8_t*)input.data, cast_set<uint32_t>(input.size),
301
8
                                            &input_bytes_read, (uint8_t*)output->data,
302
8
                                            cast_set<uint32_t>(output->size), &decompressed_len,
303
8
                                            &stream_end, &more_input_bytes, &more_output_bytes);
304
        //try decompress use hadoopLz4 ,if failed fall back lz4.
305
8
        return (st != Status::OK() || stream_end != true)
306
8
                       ? Lz4BlockCompression::decompress(input, output)
307
8
                       : Status::OK();
308
8
    }
309
310
private:
311
    std::unique_ptr<Decompressor> _decompressor;
312
};
313
// Used for LZ4 frame format, decompress speed is two times faster than LZ4.
314
class Lz4fBlockCompression : public BlockCompressionCodec {
315
private:
316
    class CContext {
317
        ENABLE_FACTORY_CREATOR(CContext);
318
319
    public:
320
1
        CContext() : ctx(nullptr) {
321
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
322
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
323
1
            buffer = std::make_unique<faststring>();
324
1
        }
325
        LZ4F_compressionContext_t ctx;
326
        std::unique_ptr<faststring> buffer;
327
1
        ~CContext() {
328
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
329
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
330
1
            if (ctx) {
331
1
                LZ4F_freeCompressionContext(ctx);
332
1
            }
333
1
            buffer.reset();
334
1
        }
335
    };
336
    class DContext {
337
        ENABLE_FACTORY_CREATOR(DContext);
338
339
    public:
340
6
        DContext() : ctx(nullptr) {}
341
        LZ4F_decompressionContext_t ctx;
342
6
        ~DContext() {
343
6
            if (ctx) {
344
6
                LZ4F_freeDecompressionContext(ctx);
345
6
            }
346
6
        }
347
    };
348
349
public:
350
31.1k
    static Lz4fBlockCompression* instance() {
351
31.1k
        static Lz4fBlockCompression s_instance;
352
31.1k
        return &s_instance;
353
31.1k
    }
354
1
    ~Lz4fBlockCompression() {
355
1
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
356
1
                ExecEnv::GetInstance()->block_compression_mem_tracker());
357
1
        _ctx_c_pool.clear();
358
1
        _ctx_d_pool.clear();
359
1
    }
360
361
5
    Status compress(const Slice& input, faststring* output) override {
362
5
        std::vector<Slice> inputs {input};
363
5
        return compress(inputs, input.size, output);
364
5
    }
365
366
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
367
25.6k
                    faststring* output) override {
368
25.6k
        return _compress(inputs, uncompressed_size, output);
369
25.6k
    }
370
371
8.51k
    Status decompress(const Slice& input, Slice* output) override {
372
8.51k
        return _decompress(input, output);
373
8.51k
    }
374
375
25.6k
    size_t max_compressed_len(size_t len) override {
376
25.6k
        return std::max(LZ4F_compressBound(len, &_s_preferences),
377
25.6k
                        LZ4F_compressFrameBound(len, &_s_preferences));
378
25.6k
    }
379
380
private:
381
    Status _compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
382
25.6k
                     faststring* output) {
383
25.6k
        std::unique_ptr<CContext> context;
384
25.6k
        RETURN_IF_ERROR(_acquire_compression_ctx(context));
385
25.6k
        bool compress_failed = false;
386
25.6k
        Defer defer {[&] {
387
25.6k
            if (!compress_failed) {
388
25.6k
                _release_compression_ctx(std::move(context));
389
25.6k
            }
390
25.6k
        }};
391
392
25.6k
        try {
393
25.6k
            Slice compressed_buf;
394
25.6k
            size_t max_len = max_compressed_len(uncompressed_size);
395
25.6k
            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
396
                // use output directly
397
0
                output->resize(max_len);
398
0
                compressed_buf.data = reinterpret_cast<char*>(output->data());
399
0
                compressed_buf.size = max_len;
400
25.6k
            } else {
401
25.6k
                {
402
25.6k
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
403
25.6k
                            ExecEnv::GetInstance()->block_compression_mem_tracker());
404
                    // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
405
25.6k
                    context->buffer->resize(max_len);
406
25.6k
                }
407
25.6k
                compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
408
25.6k
                compressed_buf.size = max_len;
409
25.6k
            }
410
411
25.6k
            auto wbytes = LZ4F_compressBegin(context->ctx, compressed_buf.data, compressed_buf.size,
412
25.6k
                                             &_s_preferences);
413
25.6k
            if (LZ4F_isError(wbytes)) {
414
0
                compress_failed = true;
415
0
                return Status::InvalidArgument("Fail to do LZ4F compress begin, res={}",
416
0
                                               LZ4F_getErrorName(wbytes));
417
0
            }
418
25.6k
            size_t offset = wbytes;
419
26.2k
            for (auto input : inputs) {
420
26.2k
                wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data + offset,
421
26.2k
                                             compressed_buf.size - offset, input.data, input.size,
422
26.2k
                                             nullptr);
423
26.2k
                if (LZ4F_isError(wbytes)) {
424
0
                    compress_failed = true;
425
0
                    return Status::InvalidArgument("Fail to do LZ4F compress update, res={}",
426
0
                                                   LZ4F_getErrorName(wbytes));
427
0
                }
428
26.2k
                offset += wbytes;
429
26.2k
            }
430
25.6k
            wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + offset,
431
25.6k
                                      compressed_buf.size - offset, nullptr);
432
25.6k
            if (LZ4F_isError(wbytes)) {
433
0
                compress_failed = true;
434
0
                return Status::InvalidArgument("Fail to do LZ4F compress end, res={}",
435
0
                                               LZ4F_getErrorName(wbytes));
436
0
            }
437
25.6k
            offset += wbytes;
438
25.6k
            output->resize(offset);
439
25.6k
            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
440
25.6k
                output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), offset);
441
25.6k
            }
442
25.6k
        } catch (...) {
443
            // Do not set compress_failed to release context
444
0
            DCHECK(!compress_failed);
445
0
            return Status::InternalError("Fail to do LZ4F compress due to exception");
446
0
        }
447
448
25.6k
        return Status::OK();
449
25.6k
    }
450
451
8.51k
    Status _decompress(const Slice& input, Slice* output) {
452
8.51k
        bool decompress_failed = false;
453
8.51k
        std::unique_ptr<DContext> context;
454
8.51k
        RETURN_IF_ERROR(_acquire_decompression_ctx(context));
455
8.51k
        Defer defer {[&] {
456
8.51k
            if (!decompress_failed) {
457
8.50k
                _release_decompression_ctx(std::move(context));
458
8.50k
            }
459
8.51k
        }};
460
8.51k
        size_t input_size = input.size;
461
8.51k
        auto lres = LZ4F_decompress(context->ctx, output->data, &output->size, input.data,
462
8.51k
                                    &input_size, nullptr);
463
8.51k
        if (LZ4F_isError(lres)) {
464
0
            decompress_failed = true;
465
0
            return Status::InternalError("Fail to do LZ4F decompress, res={}",
466
0
                                         LZ4F_getErrorName(lres));
467
8.51k
        } else if (input_size != input.size) {
468
0
            decompress_failed = true;
469
0
            return Status::InvalidArgument(
470
0
                    absl::Substitute("Fail to do LZ4F decompress: trailing data left in "
471
0
                                     "compressed data, read=$0 vs given=$1",
472
0
                                     input_size, input.size));
473
8.51k
        } else if (lres != 0) {
474
5
            decompress_failed = true;
475
5
            return Status::InvalidArgument(
476
5
                    "Fail to do LZ4F decompress: expect more compressed data, expect={}", lres);
477
5
        }
478
8.50k
        return Status::OK();
479
8.51k
    }
480
481
private:
482
    // acquire a compression ctx from pool, release while finish compress,
483
    // delete if compression failed
484
25.6k
    Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) {
485
25.6k
        std::lock_guard<std::mutex> l(_ctx_c_mutex);
486
25.6k
        if (_ctx_c_pool.empty()) {
487
1
            std::unique_ptr<CContext> localCtx = CContext::create_unique();
488
1
            if (localCtx.get() == nullptr) {
489
0
                return Status::InvalidArgument("failed to new LZ4F CContext");
490
0
            }
491
1
            auto res = LZ4F_createCompressionContext(&localCtx->ctx, LZ4F_VERSION);
492
1
            if (LZ4F_isError(res) != 0) {
493
0
                return Status::InvalidArgument(absl::Substitute(
494
0
                        "LZ4F_createCompressionContext error, res=$0", LZ4F_getErrorName(res)));
495
0
            }
496
1
            out = std::move(localCtx);
497
1
            return Status::OK();
498
1
        }
499
25.6k
        out = std::move(_ctx_c_pool.back());
500
25.6k
        _ctx_c_pool.pop_back();
501
25.6k
        return Status::OK();
502
25.6k
    }
503
25.6k
    void _release_compression_ctx(std::unique_ptr<CContext> context) {
504
25.6k
        DCHECK(context);
505
25.6k
        std::lock_guard<std::mutex> l(_ctx_c_mutex);
506
25.6k
        _ctx_c_pool.push_back(std::move(context));
507
25.6k
    }
508
509
8.51k
    Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) {
510
8.51k
        std::lock_guard<std::mutex> l(_ctx_d_mutex);
511
8.51k
        if (_ctx_d_pool.empty()) {
512
6
            std::unique_ptr<DContext> localCtx = DContext::create_unique();
513
6
            if (localCtx.get() == nullptr) {
514
0
                return Status::InvalidArgument("failed to new LZ4F DContext");
515
0
            }
516
6
            auto res = LZ4F_createDecompressionContext(&localCtx->ctx, LZ4F_VERSION);
517
6
            if (LZ4F_isError(res) != 0) {
518
0
                return Status::InvalidArgument(absl::Substitute(
519
0
                        "LZ4F_createDeompressionContext error, res=$0", LZ4F_getErrorName(res)));
520
0
            }
521
6
            out = std::move(localCtx);
522
6
            return Status::OK();
523
6
        }
524
8.50k
        out = std::move(_ctx_d_pool.back());
525
8.50k
        _ctx_d_pool.pop_back();
526
8.50k
        return Status::OK();
527
8.51k
    }
528
8.50k
    void _release_decompression_ctx(std::unique_ptr<DContext> context) {
529
8.50k
        DCHECK(context);
530
        // reset decompression context to avoid ERROR_maxBlockSize_invalid
531
8.50k
        LZ4F_resetDecompressionContext(context->ctx);
532
8.50k
        std::lock_guard<std::mutex> l(_ctx_d_mutex);
533
8.50k
        _ctx_d_pool.push_back(std::move(context));
534
8.50k
    }
535
536
private:
537
    static LZ4F_preferences_t _s_preferences;
538
539
    std::mutex _ctx_c_mutex;
540
    // LZ4F_compressionContext_t is a pointer so no copy here
541
    std::vector<std::unique_ptr<CContext>> _ctx_c_pool;
542
543
    std::mutex _ctx_d_mutex;
544
    std::vector<std::unique_ptr<DContext>> _ctx_d_pool;
545
};
546
547
LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = {
548
        {LZ4F_max256KB, LZ4F_blockLinked, LZ4F_noContentChecksum, LZ4F_frame, 0ULL, 0U,
549
         LZ4F_noBlockChecksum},
550
        0,
551
        0u,
552
        0u,
553
        {0u, 0u, 0u}};
554
555
class Lz4HCBlockCompression : public BlockCompressionCodec {
556
private:
557
    class Context {
558
        ENABLE_FACTORY_CREATOR(Context);
559
560
    public:
561
1
        Context() : ctx(nullptr) {
562
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
563
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
564
1
            buffer = std::make_unique<faststring>();
565
1
        }
566
        LZ4_streamHC_t* ctx;
567
        std::unique_ptr<faststring> buffer;
568
1
        ~Context() {
569
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
570
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
571
1
            if (ctx) {
572
1
                LZ4_freeStreamHC(ctx);
573
1
            }
574
1
            buffer.reset();
575
1
        }
576
    };
577
578
public:
579
2
    static Lz4HCBlockCompression* instance() {
580
2
        static Lz4HCBlockCompression s_instance;
581
2
        return &s_instance;
582
2
    }
583
1
    ~Lz4HCBlockCompression() {
584
1
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
585
1
                ExecEnv::GetInstance()->block_compression_mem_tracker());
586
1
        _ctx_pool.clear();
587
1
    }
588
589
6
    Status compress(const Slice& input, faststring* output) override {
590
6
        std::unique_ptr<Context> context;
591
6
        RETURN_IF_ERROR(_acquire_compression_ctx(context));
592
6
        bool compress_failed = false;
593
6
        Defer defer {[&] {
594
6
            if (!compress_failed) {
595
6
                _release_compression_ctx(std::move(context));
596
6
            }
597
6
        }};
598
599
6
        try {
600
6
            Slice compressed_buf;
601
6
            size_t max_len = max_compressed_len(input.size);
602
6
            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
603
                // use output directly
604
0
                output->resize(max_len);
605
0
                compressed_buf.data = reinterpret_cast<char*>(output->data());
606
0
                compressed_buf.size = max_len;
607
6
            } else {
608
6
                {
609
6
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
610
6
                            ExecEnv::GetInstance()->block_compression_mem_tracker());
611
                    // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
612
6
                    context->buffer->resize(max_len);
613
6
                }
614
6
                compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
615
6
                compressed_buf.size = max_len;
616
6
            }
617
618
6
            size_t compressed_len = LZ4_compress_HC_continue(
619
6
                    context->ctx, input.data, compressed_buf.data, cast_set<int>(input.size),
620
6
                    static_cast<int>(compressed_buf.size));
621
6
            if (compressed_len == 0) {
622
0
                compress_failed = true;
623
0
                return Status::InvalidArgument("Output buffer's capacity is not enough, size={}",
624
0
                                               compressed_buf.size);
625
0
            }
626
6
            output->resize(compressed_len);
627
6
            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
628
6
                output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data),
629
6
                                    compressed_len);
630
6
            }
631
6
        } catch (...) {
632
            // Do not set compress_failed to release context
633
0
            DCHECK(!compress_failed);
634
0
            return Status::InternalError("Fail to do LZ4HC compress due to exception");
635
0
        }
636
6
        return Status::OK();
637
6
    }
638
639
11
    Status decompress(const Slice& input, Slice* output) override {
640
11
        auto decompressed_len = LZ4_decompress_safe(
641
11
                input.data, output->data, cast_set<int>(input.size), cast_set<int>(output->size));
642
11
        if (decompressed_len < 0) {
643
5
            return Status::InvalidArgument(
644
5
                    "destination buffer is not large enough or the source stream is detected "
645
5
                    "malformed, fail to do LZ4 decompress, error={}",
646
5
                    decompressed_len);
647
5
        }
648
6
        output->size = decompressed_len;
649
6
        return Status::OK();
650
11
    }
651
652
6
    size_t max_compressed_len(size_t len) override { return LZ4_compressBound(cast_set<int>(len)); }
653
654
private:
655
6
    Status _acquire_compression_ctx(std::unique_ptr<Context>& out) {
656
6
        std::lock_guard<std::mutex> l(_ctx_mutex);
657
6
        if (_ctx_pool.empty()) {
658
1
            std::unique_ptr<Context> localCtx = Context::create_unique();
659
1
            if (localCtx.get() == nullptr) {
660
0
                return Status::InvalidArgument("new LZ4HC context error");
661
0
            }
662
1
            localCtx->ctx = LZ4_createStreamHC();
663
1
            if (localCtx->ctx == nullptr) {
664
0
                return Status::InvalidArgument("LZ4_createStreamHC error");
665
0
            }
666
1
            out = std::move(localCtx);
667
1
            return Status::OK();
668
1
        }
669
5
        out = std::move(_ctx_pool.back());
670
5
        _ctx_pool.pop_back();
671
5
        return Status::OK();
672
6
    }
673
6
    void _release_compression_ctx(std::unique_ptr<Context> context) {
674
6
        DCHECK(context);
675
6
        LZ4_resetStreamHC_fast(context->ctx, static_cast<int>(_compression_level));
676
6
        std::lock_guard<std::mutex> l(_ctx_mutex);
677
6
        _ctx_pool.push_back(std::move(context));
678
6
    }
679
680
private:
681
    int64_t _compression_level = config::LZ4_HC_compression_level;
682
    mutable std::mutex _ctx_mutex;
683
    mutable std::vector<std::unique_ptr<Context>> _ctx_pool;
684
};
685
686
class SnappySlicesSource : public snappy::Source {
687
public:
688
    SnappySlicesSource(const std::vector<Slice>& slices)
689
5
            : _available(0), _cur_slice(0), _slice_off(0) {
690
9
        for (auto& slice : slices) {
691
            // We filter empty slice here to avoid complicated process
692
9
            if (slice.size == 0) {
693
1
                continue;
694
1
            }
695
8
            _available += slice.size;
696
8
            _slices.push_back(slice);
697
8
        }
698
5
    }
699
5
    ~SnappySlicesSource() override {}
700
701
    // Return the number of bytes left to read from the source
702
5
    size_t Available() const override { return _available; }
703
704
    // Peek at the next flat region of the source.  Does not reposition
705
    // the source.  The returned region is empty iff Available()==0.
706
    //
707
    // Returns a pointer to the beginning of the region and store its
708
    // length in *len.
709
    //
710
    // The returned region is valid until the next call to Skip() or
711
    // until this object is destroyed, whichever occurs first.
712
    //
713
    // The returned region may be larger than Available() (for example
714
    // if this ByteSource is a view on a substring of a larger source).
715
    // The caller is responsible for ensuring that it only reads the
716
    // Available() bytes.
717
23
    const char* Peek(size_t* len) override {
718
23
        if (_available == 0) {
719
0
            *len = 0;
720
0
            return nullptr;
721
0
        }
722
        // we should assure that *len is not 0
723
23
        *len = _slices[_cur_slice].size - _slice_off;
724
23
        DCHECK(*len != 0);
725
23
        return _slices[_cur_slice].data + _slice_off;
726
23
    }
727
728
    // Skip the next n bytes.  Invalidates any buffer returned by
729
    // a previous call to Peek().
730
    // REQUIRES: Available() >= n
731
24
    void Skip(size_t n) override {
732
24
        _available -= n;
733
32
        while (n > 0) {
734
23
            auto left = _slices[_cur_slice].size - _slice_off;
735
23
            if (left > n) {
736
                // n can be digest in current slice
737
15
                _slice_off += n;
738
15
                return;
739
15
            }
740
8
            _slice_off = 0;
741
8
            _cur_slice++;
742
8
            n -= left;
743
8
        }
744
24
    }
745
746
private:
747
    std::vector<Slice> _slices;
748
    size_t _available;
749
    size_t _cur_slice;
750
    size_t _slice_off;
751
};
752
753
class SnappyBlockCompression : public BlockCompressionCodec {
754
public:
755
65.9k
    static SnappyBlockCompression* instance() {
756
65.9k
        static SnappyBlockCompression s_instance;
757
65.9k
        return &s_instance;
758
65.9k
    }
759
    ~SnappyBlockCompression() override = default;
760
761
31.8k
    Status compress(const Slice& input, faststring* output) override {
762
31.8k
        size_t max_len = max_compressed_len(input.size);
763
31.8k
        output->resize(max_len);
764
31.8k
        Slice s(*output);
765
766
31.8k
        snappy::RawCompress(input.data, input.size, s.data, &s.size);
767
31.8k
        output->resize(s.size);
768
31.8k
        return Status::OK();
769
31.8k
    }
770
771
35.0k
    Status decompress(const Slice& input, Slice* output) override {
772
35.0k
        if (!snappy::RawUncompress(input.data, input.size, output->data)) {
773
0
            return Status::InvalidArgument("Fail to do Snappy decompress");
774
0
        }
775
        // NOTE: GetUncompressedLength only takes O(1) time
776
35.0k
        snappy::GetUncompressedLength(input.data, input.size, &output->size);
777
35.0k
        return Status::OK();
778
35.0k
    }
779
780
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
781
5
                    faststring* output) override {
782
5
        auto max_len = max_compressed_len(uncompressed_size);
783
5
        output->resize(max_len);
784
785
5
        SnappySlicesSource source(inputs);
786
5
        snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(output->data()));
787
5
        output->resize(snappy::Compress(&source, &sink));
788
5
        return Status::OK();
789
5
    }
790
791
31.8k
    size_t max_compressed_len(size_t len) override { return snappy::MaxCompressedLength(len); }
792
};
793
794
class HadoopSnappyBlockCompression : public SnappyBlockCompression {
795
public:
796
4
    static HadoopSnappyBlockCompression* instance() {
797
4
        static HadoopSnappyBlockCompression s_instance;
798
4
        return &s_instance;
799
4
    }
800
    ~HadoopSnappyBlockCompression() override = default;
801
802
    // hadoop use block compression for snappy
803
    // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
804
69
    Status compress(const Slice& input, faststring* output) override {
805
        // be same with hadop https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java
806
69
        size_t snappy_block_size = config::snappy_compression_block_size;
807
69
        size_t overhead = snappy_block_size / 6 + 32;
808
69
        size_t max_input_size = snappy_block_size - overhead;
809
810
69
        size_t data_len = input.size;
811
69
        char* data = input.data;
812
69
        std::vector<OwnedSlice> buffers;
813
69
        size_t out_len = 0;
814
815
138
        while (data_len > 0) {
816
69
            size_t input_size = std::min(data_len, max_input_size);
817
69
            Slice input_slice(data, input_size);
818
69
            faststring output_data;
819
69
            RETURN_IF_ERROR(SnappyBlockCompression::compress(input_slice, &output_data));
820
69
            out_len += output_data.size();
821
            // the OwnedSlice will be moved here
822
69
            buffers.push_back(output_data.build());
823
69
            data += input_size;
824
69
            data_len -= input_size;
825
69
        }
826
827
        // hadoop block compression: umcompressed_length | compressed_length1 | compressed_data1 | compressed_length2 | compressed_data2 | ...
828
69
        size_t total_output_len = 4 + 4 * buffers.size() + out_len;
829
69
        output->resize(total_output_len);
830
69
        char* output_buffer = (char*)output->data();
831
69
        BigEndian::Store32(output_buffer, cast_set<uint32_t>(input.get_size()));
832
69
        output_buffer += 4;
833
69
        for (const auto& buffer : buffers) {
834
69
            auto slice = buffer.slice();
835
69
            BigEndian::Store32(output_buffer, cast_set<uint32_t>(slice.get_size()));
836
69
            output_buffer += 4;
837
69
            memcpy(output_buffer, slice.get_data(), slice.get_size());
838
69
            output_buffer += slice.get_size();
839
69
        }
840
841
69
        DCHECK_EQ(output_buffer - (char*)output->data(), total_output_len);
842
843
69
        return Status::OK();
844
69
    }
845
846
0
    Status decompress(const Slice& input, Slice* output) override {
847
0
        return Status::InternalError("unimplement: SnappyHadoopBlockCompression::decompress");
848
0
    }
849
};
850
851
class ZlibBlockCompression : public BlockCompressionCodec {
852
public:
853
47
    static ZlibBlockCompression* instance() {
854
47
        static ZlibBlockCompression s_instance;
855
47
        return &s_instance;
856
47
    }
857
    ~ZlibBlockCompression() override = default;
858
859
21
    Status compress(const Slice& input, faststring* output) override {
860
21
        size_t max_len = max_compressed_len(input.size);
861
21
        output->resize(max_len);
862
21
        Slice s(*output);
863
864
21
        auto zres = ::compress((Bytef*)s.data, &s.size, (Bytef*)input.data, input.size);
865
21
        if (zres == Z_MEM_ERROR) {
866
0
            throw Exception(Status::MemoryLimitExceeded(fmt::format(
867
0
                    "ZLib compression failed due to memory allocationerror.error = {}, res = {} ",
868
0
                    zError(zres), zres)));
869
21
        } else if (zres != Z_OK) {
870
0
            return Status::InternalError("Fail to do Zlib compress, error={}", zError(zres));
871
0
        }
872
21
        output->resize(s.size);
873
21
        return Status::OK();
874
21
    }
875
876
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
877
1
                    faststring* output) override {
878
1
        size_t max_len = max_compressed_len(uncompressed_size);
879
1
        output->resize(max_len);
880
881
1
        z_stream zstrm;
882
1
        zstrm.zalloc = Z_NULL;
883
1
        zstrm.zfree = Z_NULL;
884
1
        zstrm.opaque = Z_NULL;
885
1
        auto zres = deflateInit(&zstrm, Z_DEFAULT_COMPRESSION);
886
1
        if (zres == Z_MEM_ERROR) {
887
0
            throw Exception(Status::MemoryLimitExceeded(
888
0
                    "Fail to do ZLib stream compress, error={}, res={}", zError(zres), zres));
889
1
        } else if (zres != Z_OK) {
890
0
            return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
891
0
                                         zError(zres), zres);
892
0
        }
893
        // we assume that output is e
894
1
        zstrm.next_out = (Bytef*)output->data();
895
1
        zstrm.avail_out = cast_set<decltype(zstrm.avail_out)>(output->size());
896
6
        for (int i = 0; i < inputs.size(); ++i) {
897
5
            if (inputs[i].size == 0) {
898
1
                continue;
899
1
            }
900
4
            zstrm.next_in = (Bytef*)inputs[i].data;
901
4
            zstrm.avail_in = cast_set<decltype(zstrm.avail_in)>(inputs[i].size);
902
4
            int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH;
903
904
4
            zres = deflate(&zstrm, flush);
905
4
            if (zres != Z_OK && zres != Z_STREAM_END) {
906
0
                return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
907
0
                                             zError(zres), zres);
908
0
            }
909
4
        }
910
911
1
        output->resize(zstrm.total_out);
912
1
        zres = deflateEnd(&zstrm);
913
1
        if (zres == Z_DATA_ERROR) {
914
0
            return Status::InvalidArgument("Fail to do deflateEnd, error={}, res={}", zError(zres),
915
0
                                           zres);
916
1
        } else if (zres != Z_OK) {
917
0
            return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}",
918
0
                                         zError(zres), zres);
919
0
        }
920
1
        return Status::OK();
921
1
    }
922
923
34
    Status decompress(const Slice& input, Slice* output) override {
924
34
        size_t input_size = input.size;
925
34
        auto zres =
926
34
                ::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size);
927
34
        if (zres == Z_DATA_ERROR) {
928
4
            return Status::InvalidArgument("Fail to do ZLib decompress, error={}", zError(zres));
929
30
        } else if (zres == Z_MEM_ERROR) {
930
0
            throw Exception(Status::MemoryLimitExceeded("Fail to do ZLib decompress, error={}",
931
0
                                                        zError(zres)));
932
30
        } else if (zres != Z_OK) {
933
7
            return Status::InternalError("Fail to do ZLib decompress, error={}", zError(zres));
934
7
        }
935
23
        return Status::OK();
936
34
    }
937
938
22
    size_t max_compressed_len(size_t len) override {
939
        // one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block
940
22
        return len + 6 + 5 * ((len >> 14) + 1);
941
22
    }
942
};
943
944
class Bzip2BlockCompression : public BlockCompressionCodec {
945
public:
946
4
    static Bzip2BlockCompression* instance() {
947
4
        static Bzip2BlockCompression s_instance;
948
4
        return &s_instance;
949
4
    }
950
    ~Bzip2BlockCompression() override = default;
951
952
69
    Status compress(const Slice& input, faststring* output) override {
953
69
        size_t max_len = max_compressed_len(input.size);
954
69
        output->resize(max_len);
955
69
        auto size = cast_set<uint32_t>(output->size());
956
69
        auto bzres = BZ2_bzBuffToBuffCompress((char*)output->data(), &size, (char*)input.data,
957
69
                                              cast_set<uint32_t>(input.size), 9, 0, 0);
958
69
        if (bzres == BZ_MEM_ERROR) {
959
0
            throw Exception(
960
0
                    Status::MemoryLimitExceeded("Fail to do Bzip2 compress, ret={}", bzres));
961
69
        } else if (bzres == BZ_PARAM_ERROR) {
962
0
            return Status::InvalidArgument("Fail to do Bzip2 compress, ret={}", bzres);
963
69
        } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK &&
964
69
                   bzres != BZ_STREAM_END && bzres != BZ_OK) {
965
0
            return Status::InternalError("Failed to init bz2. status code: {}", bzres);
966
0
        }
967
69
        output->resize(size);
968
69
        return Status::OK();
969
69
    }
970
971
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
972
0
                    faststring* output) override {
973
0
        size_t max_len = max_compressed_len(uncompressed_size);
974
0
        output->resize(max_len);
975
976
0
        bz_stream bzstrm;
977
0
        bzero(&bzstrm, sizeof(bzstrm));
978
0
        int bzres = BZ2_bzCompressInit(&bzstrm, 9, 0, 0);
979
0
        if (bzres == BZ_PARAM_ERROR) {
980
0
            return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres);
981
0
        } else if (bzres == BZ_MEM_ERROR) {
982
0
            throw Exception(
983
0
                    Status::MemoryLimitExceeded("Failed to init bz2. status code: {}", bzres));
984
0
        } else if (bzres != BZ_OK) {
985
0
            return Status::InternalError("Failed to init bz2. status code: {}", bzres);
986
0
        }
987
        // we assume that output is e
988
0
        bzstrm.next_out = (char*)output->data();
989
0
        bzstrm.avail_out = cast_set<uint32_t>(output->size());
990
0
        for (int i = 0; i < inputs.size(); ++i) {
991
0
            if (inputs[i].size == 0) {
992
0
                continue;
993
0
            }
994
0
            bzstrm.next_in = (char*)inputs[i].data;
995
0
            bzstrm.avail_in = cast_set<uint32_t>(inputs[i].size);
996
0
            int flush = (i == (inputs.size() - 1)) ? BZ_FINISH : BZ_RUN;
997
998
0
            bzres = BZ2_bzCompress(&bzstrm, flush);
999
0
            if (bzres == BZ_PARAM_ERROR) {
1000
0
                return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres);
1001
0
            } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK &&
1002
0
                       bzres != BZ_STREAM_END && bzres != BZ_OK) {
1003
0
                return Status::InternalError("Failed to init bz2. status code: {}", bzres);
1004
0
            }
1005
0
        }
1006
1007
0
        size_t total_out = (size_t)bzstrm.total_out_hi32 << 32 | (size_t)bzstrm.total_out_lo32;
1008
0
        output->resize(total_out);
1009
0
        bzres = BZ2_bzCompressEnd(&bzstrm);
1010
0
        if (bzres == BZ_PARAM_ERROR) {
1011
0
            return Status::InvalidArgument("Fail to do deflateEnd on bzip2 stream, res={}", bzres);
1012
0
        } else if (bzres != BZ_OK) {
1013
0
            return Status::InternalError("Fail to do deflateEnd on bzip2 stream, res={}", bzres);
1014
0
        }
1015
0
        return Status::OK();
1016
0
    }
1017
1018
0
    Status decompress(const Slice& input, Slice* output) override {
1019
0
        return Status::InternalError("unimplement: Bzip2BlockCompression::decompress");
1020
0
    }
1021
1022
69
    size_t max_compressed_len(size_t len) override {
1023
        // TODO: make sure the max_compressed_len for bzip2
1024
        // 50 is an estimate fix overhead for bzip2
1025
        // in case the input len is small and BZ2_bzBuffToBuffCompress will return
1026
        // BZ_OUTBUFF_FULL
1027
69
        return len * 2 + 50;
1028
69
    }
1029
};
1030
1031
// for ZSTD compression and decompression, with BOTH fast and high compression ratio
1032
class ZstdBlockCompression : public BlockCompressionCodec {
1033
private:
1034
    class CContext {
1035
        ENABLE_FACTORY_CREATOR(CContext);
1036
1037
    public:
1038
29
        CContext() : ctx(nullptr) {
1039
29
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
1040
29
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
1041
29
            buffer = std::make_unique<faststring>();
1042
29
        }
1043
        ZSTD_CCtx* ctx;
1044
        std::unique_ptr<faststring> buffer;
1045
7
        ~CContext() {
1046
7
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
1047
7
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
1048
7
            if (ctx) {
1049
7
                ZSTD_freeCCtx(ctx);
1050
7
            }
1051
7
            buffer.reset();
1052
7
        }
1053
    };
1054
    class DContext {
1055
        ENABLE_FACTORY_CREATOR(DContext);
1056
1057
    public:
1058
39
        DContext() : ctx(nullptr) {}
1059
        ZSTD_DCtx* ctx;
1060
18
        ~DContext() {
1061
18
            if (ctx) {
1062
18
                ZSTD_freeDCtx(ctx);
1063
18
            }
1064
18
        }
1065
    };
1066
1067
public:
1068
16.0M
    static ZstdBlockCompression* instance() {
1069
16.0M
        static ZstdBlockCompression s_instance;
1070
16.0M
        return &s_instance;
1071
16.0M
    }
1072
4
    ~ZstdBlockCompression() {
1073
4
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
1074
4
                ExecEnv::GetInstance()->block_compression_mem_tracker());
1075
4
        _ctx_c_pool.clear();
1076
4
        _ctx_d_pool.clear();
1077
4
    }
1078
1079
337k
    size_t max_compressed_len(size_t len) override { return ZSTD_compressBound(len); }
1080
1081
644
    Status compress(const Slice& input, faststring* output) override {
1082
644
        std::vector<Slice> inputs {input};
1083
644
        return compress(inputs, input.size, output);
1084
644
    }
1085
1086
    // follow ZSTD official example
1087
    //  https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c
1088
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
1089
337k
                    faststring* output) override {
1090
337k
        std::unique_ptr<CContext> context;
1091
337k
        RETURN_IF_ERROR(_acquire_compression_ctx(context));
1092
337k
        bool compress_failed = false;
1093
337k
        Defer defer {[&] {
1094
337k
            if (!compress_failed) {
1095
337k
                _release_compression_ctx(std::move(context));
1096
337k
            }
1097
337k
        }};
1098
1099
337k
        try {
1100
337k
            size_t max_len = max_compressed_len(uncompressed_size);
1101
337k
            Slice compressed_buf;
1102
337k
            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
1103
                // use output directly
1104
4
                output->resize(max_len);
1105
4
                compressed_buf.data = reinterpret_cast<char*>(output->data());
1106
4
                compressed_buf.size = max_len;
1107
337k
            } else {
1108
337k
                {
1109
337k
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
1110
337k
                            ExecEnv::GetInstance()->block_compression_mem_tracker());
1111
                    // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
1112
337k
                    context->buffer->resize(max_len);
1113
337k
                }
1114
337k
                compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
1115
337k
                compressed_buf.size = max_len;
1116
337k
            }
1117
1118
            // set compression level to default 3
1119
337k
            auto ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_compressionLevel,
1120
337k
                                              ZSTD_CLEVEL_DEFAULT);
1121
337k
            if (ZSTD_isError(ret)) {
1122
0
                return Status::InvalidArgument("ZSTD_CCtx_setParameter compression level error: {}",
1123
0
                                               ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
1124
0
            }
1125
            // set checksum flag to 1
1126
337k
            ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1);
1127
337k
            if (ZSTD_isError(ret)) {
1128
0
                return Status::InvalidArgument("ZSTD_CCtx_setParameter checksumFlag error: {}",
1129
0
                                               ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
1130
0
            }
1131
1132
337k
            ZSTD_outBuffer out_buf = {compressed_buf.data, compressed_buf.size, 0};
1133
1134
743k
            for (size_t i = 0; i < inputs.size(); i++) {
1135
405k
                ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0};
1136
1137
405k
                bool last_input = (i == inputs.size() - 1);
1138
405k
                auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue;
1139
1140
405k
                bool finished = false;
1141
405k
                do {
1142
                    // do compress
1143
405k
                    ret = ZSTD_compressStream2(context->ctx, &out_buf, &in_buf, mode);
1144
1145
405k
                    if (ZSTD_isError(ret)) {
1146
0
                        compress_failed = true;
1147
0
                        return Status::InternalError("ZSTD_compressStream2 error: {}",
1148
0
                                                     ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
1149
0
                    }
1150
1151
                    // ret is ZSTD hint for needed output buffer size
1152
405k
                    if (ret > 0 && out_buf.pos == out_buf.size) {
1153
0
                        compress_failed = true;
1154
0
                        return Status::InternalError("ZSTD_compressStream2 output buffer full");
1155
0
                    }
1156
1157
405k
                    finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size);
1158
405k
                } while (!finished);
1159
405k
            }
1160
1161
            // set compressed size for caller
1162
337k
            output->resize(out_buf.pos);
1163
337k
            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
1164
337k
                output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), out_buf.pos);
1165
337k
            }
1166
337k
        } catch (std::exception& e) {
1167
0
            return Status::InternalError("Fail to do ZSTD compress due to exception {}", e.what());
1168
0
        } catch (...) {
1169
            // Do not set compress_failed to release context
1170
0
            DCHECK(!compress_failed);
1171
0
            return Status::InternalError("Fail to do ZSTD compress due to exception");
1172
0
        }
1173
1174
337k
        return Status::OK();
1175
337k
    }
1176
1177
182k
    Status decompress(const Slice& input, Slice* output) override {
1178
182k
        std::unique_ptr<DContext> context;
1179
182k
        bool decompress_failed = false;
1180
182k
        RETURN_IF_ERROR(_acquire_decompression_ctx(context));
1181
182k
        Defer defer {[&] {
1182
182k
            if (!decompress_failed) {
1183
182k
                _release_decompression_ctx(std::move(context));
1184
182k
            }
1185
182k
        }};
1186
1187
182k
        size_t ret = ZSTD_decompressDCtx(context->ctx, output->data, output->size, input.data,
1188
182k
                                         input.size);
1189
182k
        if (ZSTD_isError(ret)) {
1190
5
            decompress_failed = true;
1191
5
            return Status::InternalError("ZSTD_decompressDCtx error: {}",
1192
5
                                         ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
1193
5
        }
1194
1195
        // set decompressed size for caller
1196
182k
        output->size = ret;
1197
1198
182k
        return Status::OK();
1199
182k
    }
1200
1201
private:
1202
337k
    Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) {
1203
337k
        std::lock_guard<std::mutex> l(_ctx_c_mutex);
1204
337k
        if (_ctx_c_pool.empty()) {
1205
29
            std::unique_ptr<CContext> localCtx = CContext::create_unique();
1206
29
            if (localCtx.get() == nullptr) {
1207
0
                return Status::InvalidArgument("failed to new ZSTD CContext");
1208
0
            }
1209
            //typedef LZ4F_cctx* LZ4F_compressionContext_t;
1210
29
            localCtx->ctx = ZSTD_createCCtx();
1211
29
            if (localCtx->ctx == nullptr) {
1212
0
                return Status::InvalidArgument("Failed to create ZSTD compress ctx");
1213
0
            }
1214
29
            out = std::move(localCtx);
1215
29
            return Status::OK();
1216
29
        }
1217
337k
        out = std::move(_ctx_c_pool.back());
1218
337k
        _ctx_c_pool.pop_back();
1219
337k
        return Status::OK();
1220
337k
    }
1221
337k
    void _release_compression_ctx(std::unique_ptr<CContext> context) {
1222
337k
        DCHECK(context);
1223
337k
        auto ret = ZSTD_CCtx_reset(context->ctx, ZSTD_reset_session_only);
1224
337k
        DCHECK(!ZSTD_isError(ret));
1225
337k
        std::lock_guard<std::mutex> l(_ctx_c_mutex);
1226
337k
        _ctx_c_pool.push_back(std::move(context));
1227
337k
    }
1228
1229
182k
    Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) {
1230
182k
        std::lock_guard<std::mutex> l(_ctx_d_mutex);
1231
182k
        if (_ctx_d_pool.empty()) {
1232
39
            std::unique_ptr<DContext> localCtx = DContext::create_unique();
1233
39
            if (localCtx.get() == nullptr) {
1234
0
                return Status::InvalidArgument("failed to new ZSTD DContext");
1235
0
            }
1236
39
            localCtx->ctx = ZSTD_createDCtx();
1237
39
            if (localCtx->ctx == nullptr) {
1238
0
                return Status::InvalidArgument("Fail to init ZSTD decompress context");
1239
0
            }
1240
39
            out = std::move(localCtx);
1241
39
            return Status::OK();
1242
39
        }
1243
182k
        out = std::move(_ctx_d_pool.back());
1244
182k
        _ctx_d_pool.pop_back();
1245
182k
        return Status::OK();
1246
182k
    }
1247
182k
    void _release_decompression_ctx(std::unique_ptr<DContext> context) {
1248
182k
        DCHECK(context);
1249
        // reset ctx to start a new decompress session
1250
182k
        auto ret = ZSTD_DCtx_reset(context->ctx, ZSTD_reset_session_only);
1251
182k
        DCHECK(!ZSTD_isError(ret));
1252
182k
        std::lock_guard<std::mutex> l(_ctx_d_mutex);
1253
182k
        _ctx_d_pool.push_back(std::move(context));
1254
182k
    }
1255
1256
private:
1257
    mutable std::mutex _ctx_c_mutex;
1258
    mutable std::vector<std::unique_ptr<CContext>> _ctx_c_pool;
1259
1260
    mutable std::mutex _ctx_d_mutex;
1261
    mutable std::vector<std::unique_ptr<DContext>> _ctx_d_pool;
1262
};
1263
1264
class GzipBlockCompression : public ZlibBlockCompression {
1265
public:
1266
7
    static GzipBlockCompression* instance() {
1267
7
        static GzipBlockCompression s_instance;
1268
7
        return &s_instance;
1269
7
    }
1270
    ~GzipBlockCompression() override = default;
1271
1272
95
    Status compress(const Slice& input, faststring* output) override {
1273
95
        size_t max_len = max_compressed_len(input.size);
1274
95
        output->resize(max_len);
1275
1276
95
        z_stream z_strm = {};
1277
95
        z_strm.zalloc = Z_NULL;
1278
95
        z_strm.zfree = Z_NULL;
1279
95
        z_strm.opaque = Z_NULL;
1280
1281
95
        int zres = deflateInit2(&z_strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC,
1282
95
                                8, Z_DEFAULT_STRATEGY);
1283
1284
95
        if (zres == Z_MEM_ERROR) {
1285
0
            throw Exception(Status::MemoryLimitExceeded(
1286
0
                    "Fail to init ZLib compress, error={}, res={}", zError(zres), zres));
1287
95
        } else if (zres != Z_OK) {
1288
0
            return Status::InternalError("Fail to init ZLib compress, error={}, res={}",
1289
0
                                         zError(zres), zres);
1290
0
        }
1291
1292
95
        z_strm.next_in = (Bytef*)input.get_data();
1293
95
        z_strm.avail_in = cast_set<decltype(z_strm.avail_in)>(input.get_size());
1294
95
        z_strm.next_out = (Bytef*)output->data();
1295
95
        z_strm.avail_out = cast_set<decltype(z_strm.avail_out)>(output->size());
1296
1297
95
        zres = deflate(&z_strm, Z_FINISH);
1298
95
        if (zres != Z_OK && zres != Z_STREAM_END) {
1299
0
            return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
1300
0
                                         zError(zres), zres);
1301
0
        }
1302
1303
95
        output->resize(z_strm.total_out);
1304
95
        zres = deflateEnd(&z_strm);
1305
95
        if (zres == Z_DATA_ERROR) {
1306
0
            return Status::InvalidArgument("Fail to end zlib compress");
1307
95
        } else if (zres != Z_OK) {
1308
0
            return Status::InternalError("Fail to end zlib compress");
1309
0
        }
1310
95
        return Status::OK();
1311
95
    }
1312
1313
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
1314
0
                    faststring* output) override {
1315
0
        size_t max_len = max_compressed_len(uncompressed_size);
1316
0
        output->resize(max_len);
1317
1318
0
        z_stream zstrm;
1319
0
        zstrm.zalloc = Z_NULL;
1320
0
        zstrm.zfree = Z_NULL;
1321
0
        zstrm.opaque = Z_NULL;
1322
0
        auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC,
1323
0
                                 8, Z_DEFAULT_STRATEGY);
1324
0
        if (zres == Z_MEM_ERROR) {
1325
0
            throw Exception(Status::MemoryLimitExceeded(
1326
0
                    "Fail to init ZLib stream compress, error={}, res={}", zError(zres), zres));
1327
0
        } else if (zres != Z_OK) {
1328
0
            return Status::InternalError("Fail to init ZLib stream compress, error={}, res={}",
1329
0
                                         zError(zres), zres);
1330
0
        }
1331
1332
        // we assume that output is e
1333
0
        zstrm.next_out = (Bytef*)output->data();
1334
0
        zstrm.avail_out = cast_set<decltype(zstrm.avail_out)>(output->size());
1335
0
        for (int i = 0; i < inputs.size(); ++i) {
1336
0
            if (inputs[i].size == 0) {
1337
0
                continue;
1338
0
            }
1339
0
            zstrm.next_in = (Bytef*)inputs[i].data;
1340
0
            zstrm.avail_in = cast_set<decltype(zstrm.avail_in)>(inputs[i].size);
1341
0
            int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH;
1342
1343
0
            zres = deflate(&zstrm, flush);
1344
0
            if (zres != Z_OK && zres != Z_STREAM_END) {
1345
0
                return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
1346
0
                                             zError(zres), zres);
1347
0
            }
1348
0
        }
1349
1350
0
        output->resize(zstrm.total_out);
1351
0
        zres = deflateEnd(&zstrm);
1352
0
        if (zres == Z_DATA_ERROR) {
1353
0
            return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}",
1354
0
                                           zError(zres), zres);
1355
0
        } else if (zres != Z_OK) {
1356
0
            return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}",
1357
0
                                         zError(zres), zres);
1358
0
        }
1359
0
        return Status::OK();
1360
0
    }
1361
1362
0
    Status decompress(const Slice& input, Slice* output) override {
1363
0
        z_stream z_strm = {};
1364
0
        z_strm.zalloc = Z_NULL;
1365
0
        z_strm.zfree = Z_NULL;
1366
0
        z_strm.opaque = Z_NULL;
1367
1368
0
        int ret = inflateInit2(&z_strm, MAX_WBITS + GZIP_CODEC);
1369
0
        if (ret != Z_OK) {
1370
0
            return Status::InternalError("Fail to init ZLib decompress, error={}, res={}",
1371
0
                                         zError(ret), ret);
1372
0
        }
1373
1374
        // 1. set input and output
1375
0
        z_strm.next_in = reinterpret_cast<Bytef*>(input.data);
1376
0
        z_strm.avail_in = cast_set<decltype(z_strm.avail_in)>(input.size);
1377
0
        z_strm.next_out = reinterpret_cast<Bytef*>(output->data);
1378
0
        z_strm.avail_out = cast_set<decltype(z_strm.avail_out)>(output->size);
1379
1380
0
        if (z_strm.avail_out > 0) {
1381
            // We only support non-streaming use case  for block decompressor
1382
0
            ret = inflate(&z_strm, Z_FINISH);
1383
0
            if (ret != Z_OK && ret != Z_STREAM_END) {
1384
0
                (void)inflateEnd(&z_strm);
1385
0
                if (ret == Z_MEM_ERROR) {
1386
0
                    throw Exception(Status::MemoryLimitExceeded(
1387
0
                            "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret));
1388
0
                } else if (ret == Z_DATA_ERROR) {
1389
0
                    return Status::InvalidArgument(
1390
0
                            "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret);
1391
0
                }
1392
0
                return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
1393
0
                                             zError(ret), ret);
1394
0
            }
1395
0
        }
1396
0
        (void)inflateEnd(&z_strm);
1397
1398
0
        return Status::OK();
1399
0
    }
1400
1401
95
    size_t max_compressed_len(size_t len) override {
1402
95
        z_stream zstrm;
1403
95
        zstrm.zalloc = Z_NULL;
1404
95
        zstrm.zfree = Z_NULL;
1405
95
        zstrm.opaque = Z_NULL;
1406
95
        auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC,
1407
95
                                 MEM_LEVEL, Z_DEFAULT_STRATEGY);
1408
95
        if (zres != Z_OK) {
1409
            // Fall back to zlib estimate logic for deflate, notice this may
1410
            // cause decompress error
1411
0
            LOG(WARNING) << "Fail to do ZLib stream compress, error=" << zError(zres)
1412
0
                         << ", res=" << zres;
1413
0
            return ZlibBlockCompression::max_compressed_len(len);
1414
95
        } else {
1415
95
            zres = deflateEnd(&zstrm);
1416
95
            if (zres != Z_OK) {
1417
0
                LOG(WARNING) << "Fail to do deflateEnd on ZLib stream, error=" << zError(zres)
1418
0
                             << ", res=" << zres;
1419
0
            }
1420
            // Mark, maintainer of zlib, has stated that 12 needs to be added to
1421
            // result for gzip
1422
            // http://compgroups.net/comp.unix.programmer/gzip-compressing-an-in-memory-string-usi/54854
1423
            // To have a safe upper bound for "wrapper variations", we add 32 to
1424
            // estimate
1425
95
            auto upper_bound = deflateBound(&zstrm, len) + 32;
1426
95
            return upper_bound;
1427
95
        }
1428
95
    }
1429
1430
private:
1431
    // Magic number for zlib, see https://zlib.net/manual.html for more details.
1432
    const static int GZIP_CODEC = 16; // gzip
1433
    // The memLevel parameter specifies how much memory should be allocated for
1434
    // the internal compression state.
1435
    const static int MEM_LEVEL = 8;
1436
};
1437
1438
// Only used on x86 or x86_64
1439
#if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \
1440
        defined(__i386) || defined(_M_IX86)
1441
class GzipBlockCompressionByLibdeflate final : public GzipBlockCompression {
1442
public:
1443
1
    GzipBlockCompressionByLibdeflate() : GzipBlockCompression() {}
1444
4
    static GzipBlockCompressionByLibdeflate* instance() {
1445
4
        static GzipBlockCompressionByLibdeflate s_instance;
1446
4
        return &s_instance;
1447
4
    }
1448
    ~GzipBlockCompressionByLibdeflate() override = default;
1449
1450
8
    Status decompress(const Slice& input, Slice* output) override {
1451
8
        if (input.empty()) {
1452
0
            output->size = 0;
1453
0
            return Status::OK();
1454
0
        }
1455
8
        thread_local std::unique_ptr<libdeflate_decompressor, void (*)(libdeflate_decompressor*)>
1456
8
                decompressor {libdeflate_alloc_decompressor(), libdeflate_free_decompressor};
1457
8
        if (!decompressor) {
1458
0
            return Status::InternalError("libdeflate_alloc_decompressor error.");
1459
0
        }
1460
8
        std::size_t out_len;
1461
8
        auto result = libdeflate_gzip_decompress(decompressor.get(), input.data, input.size,
1462
8
                                                 output->data, output->size, &out_len);
1463
8
        if (result != LIBDEFLATE_SUCCESS) {
1464
0
            return Status::InternalError("libdeflate_gzip_decompress error, res={}", result);
1465
0
        }
1466
8
        return Status::OK();
1467
8
    }
1468
};
1469
#endif
1470
1471
class LzoBlockCompression final : public BlockCompressionCodec {
1472
public:
1473
0
    static LzoBlockCompression* instance() {
1474
0
        static LzoBlockCompression s_instance;
1475
0
        return &s_instance;
1476
0
    }
1477
1478
0
    Status compress(const Slice& input, faststring* output) override {
1479
0
        return Status::InvalidArgument("not impl lzo compress.");
1480
0
    }
1481
0
    size_t max_compressed_len(size_t len) override { return 0; };
1482
0
    Status decompress(const Slice& input, Slice* output) override {
1483
0
        auto* input_ptr = input.data;
1484
0
        auto remain_input_size = input.size;
1485
0
        auto* output_ptr = output->data;
1486
0
        auto remain_output_size = output->size;
1487
0
        auto* output_limit = output->data + output->size;
1488
1489
        // Example:
1490
        // OriginData(The original data will be divided into several large data block.) :
1491
        //      large data block1 | large data block2 | large data block3 | ....
1492
        // The large data block will be divided into several small data block.
1493
        // Suppose a large data block is divided into three small blocks:
1494
        // large data block1:            | small block1 | small block2 | small block3 |
1495
        // CompressData:   <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]>
1496
        //
1497
        // A : original length of the current block of large data block.
1498
        // sizeof(A) = 4 bytes.
1499
        // A = length(small block1) + length(small block2) + length(small block3)
1500
        // Bx : length of  small data block bx.
1501
        // sizeof(Bx) = 4 bytes.
1502
        // Bx = length(compress(small blockx))
1503
0
        try {
1504
0
            while (remain_input_size > 0) {
1505
0
                if (remain_input_size < 4) {
1506
0
                    return Status::InvalidArgument(
1507
0
                            "Need more input buffer to get large_block_uncompressed_len.");
1508
0
                }
1509
1510
0
                uint32_t large_block_uncompressed_len = BigEndian::Load32(input_ptr);
1511
0
                input_ptr += 4;
1512
0
                remain_input_size -= 4;
1513
1514
0
                if (remain_output_size < large_block_uncompressed_len) {
1515
0
                    return Status::InvalidArgument(
1516
0
                            "Need more output buffer to get uncompressed data.");
1517
0
                }
1518
1519
0
                while (large_block_uncompressed_len > 0) {
1520
0
                    if (remain_input_size < 4) {
1521
0
                        return Status::InvalidArgument(
1522
0
                                "Need more input buffer to get small_block_compressed_len.");
1523
0
                    }
1524
1525
0
                    uint32_t small_block_compressed_len = BigEndian::Load32(input_ptr);
1526
0
                    input_ptr += 4;
1527
0
                    remain_input_size -= 4;
1528
1529
0
                    if (remain_input_size < small_block_compressed_len) {
1530
0
                        return Status::InvalidArgument(
1531
0
                                "Need more input buffer to decompress small block.");
1532
0
                    }
1533
1534
0
                    auto small_block_uncompressed_len =
1535
0
                            orc::lzoDecompress(input_ptr, input_ptr + small_block_compressed_len,
1536
0
                                               output_ptr, output_limit);
1537
1538
0
                    input_ptr += small_block_compressed_len;
1539
0
                    remain_input_size -= small_block_compressed_len;
1540
1541
0
                    output_ptr += small_block_uncompressed_len;
1542
0
                    large_block_uncompressed_len -= small_block_uncompressed_len;
1543
0
                    remain_output_size -= small_block_uncompressed_len;
1544
0
                }
1545
0
            }
1546
0
        } catch (const orc::ParseError& e) {
1547
            //Prevent be from hanging due to orc::lzoDecompress throw exception
1548
0
            return Status::InternalError("Fail to do LZO decompress, error={}", e.what());
1549
0
        }
1550
0
        return Status::OK();
1551
0
    }
1552
};
1553
1554
class BrotliBlockCompression final : public BlockCompressionCodec {
1555
public:
1556
0
    static BrotliBlockCompression* instance() {
1557
0
        static BrotliBlockCompression s_instance;
1558
0
        return &s_instance;
1559
0
    }
1560
1561
0
    Status compress(const Slice& input, faststring* output) override {
1562
0
        return Status::InvalidArgument("not impl brotli compress.");
1563
0
    }
1564
1565
0
    size_t max_compressed_len(size_t len) override { return 0; };
1566
1567
0
    Status decompress(const Slice& input, Slice* output) override {
1568
        // The size of output buffer is always equal to the umcompressed length.
1569
0
        BrotliDecoderResult result = BrotliDecoderDecompress(
1570
0
                input.get_size(), reinterpret_cast<const uint8_t*>(input.get_data()), &output->size,
1571
0
                reinterpret_cast<uint8_t*>(output->data));
1572
0
        if (result != BROTLI_DECODER_RESULT_SUCCESS) {
1573
0
            return Status::InternalError("Brotli decompression failed, result={}", result);
1574
0
        }
1575
0
        return Status::OK();
1576
0
    }
1577
};
1578
1579
Status get_block_compression_codec(segment_v2::CompressionTypePB type,
1580
24.6M
                                   BlockCompressionCodec** codec) {
1581
24.6M
    switch (type) {
1582
8.46M
    case segment_v2::CompressionTypePB::NO_COMPRESSION:
1583
8.46M
        *codec = nullptr;
1584
8.46M
        break;
1585
64.9k
    case segment_v2::CompressionTypePB::SNAPPY:
1586
64.9k
        *codec = SnappyBlockCompression::instance();
1587
64.9k
        break;
1588
70.7k
    case segment_v2::CompressionTypePB::LZ4:
1589
70.7k
        *codec = Lz4BlockCompression::instance();
1590
70.7k
        break;
1591
31.1k
    case segment_v2::CompressionTypePB::LZ4F:
1592
31.1k
        *codec = Lz4fBlockCompression::instance();
1593
31.1k
        break;
1594
2
    case segment_v2::CompressionTypePB::LZ4HC:
1595
2
        *codec = Lz4HCBlockCompression::instance();
1596
2
        break;
1597
47
    case segment_v2::CompressionTypePB::ZLIB:
1598
47
        *codec = ZlibBlockCompression::instance();
1599
47
        break;
1600
16.0M
    case segment_v2::CompressionTypePB::ZSTD:
1601
16.0M
        *codec = ZstdBlockCompression::instance();
1602
16.0M
        break;
1603
0
    default:
1604
0
        return Status::InternalError("unknown compression type({})", type);
1605
24.6M
    }
1606
1607
24.6M
    return Status::OK();
1608
24.6M
}
1609
1610
// this can only be used in hive text write
1611
172
Status get_block_compression_codec(TFileCompressType::type type, BlockCompressionCodec** codec) {
1612
172
    switch (type) {
1613
149
    case TFileCompressType::PLAIN:
1614
149
        *codec = nullptr;
1615
149
        break;
1616
0
    case TFileCompressType::ZLIB:
1617
0
        *codec = ZlibBlockCompression::instance();
1618
0
        break;
1619
7
    case TFileCompressType::GZ:
1620
7
        *codec = GzipBlockCompression::instance();
1621
7
        break;
1622
4
    case TFileCompressType::BZ2:
1623
4
        *codec = Bzip2BlockCompression::instance();
1624
4
        break;
1625
4
    case TFileCompressType::LZ4BLOCK:
1626
4
        *codec = HadoopLz4BlockCompression::instance();
1627
4
        break;
1628
4
    case TFileCompressType::SNAPPYBLOCK:
1629
4
        *codec = HadoopSnappyBlockCompression::instance();
1630
4
        break;
1631
4
    case TFileCompressType::ZSTD:
1632
4
        *codec = ZstdBlockCompression::instance();
1633
4
        break;
1634
0
    default:
1635
0
        return Status::InternalError("unsupport compression type({}) int hive text", type);
1636
172
    }
1637
1638
172
    return Status::OK();
1639
172
}
1640
1641
Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_codec,
1642
1.23k
                                   BlockCompressionCodec** codec) {
1643
1.23k
    switch (parquet_codec) {
1644
118
    case tparquet::CompressionCodec::UNCOMPRESSED:
1645
118
        *codec = nullptr;
1646
118
        break;
1647
1.06k
    case tparquet::CompressionCodec::SNAPPY:
1648
1.06k
        *codec = SnappyBlockCompression::instance();
1649
1.06k
        break;
1650
4
    case tparquet::CompressionCodec::LZ4_RAW: // we can use LZ4 compression algorithm parse LZ4_RAW
1651
4
    case tparquet::CompressionCodec::LZ4:
1652
4
        *codec = HadoopLz4BlockCompression::instance();
1653
4
        break;
1654
38
    case tparquet::CompressionCodec::ZSTD:
1655
38
        *codec = ZstdBlockCompression::instance();
1656
38
        break;
1657
4
    case tparquet::CompressionCodec::GZIP:
1658
// Only used on x86 or x86_64
1659
4
#if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \
1660
4
        defined(__i386) || defined(_M_IX86)
1661
4
        *codec = GzipBlockCompressionByLibdeflate::instance();
1662
#else
1663
        *codec = GzipBlockCompression::instance();
1664
#endif
1665
4
        break;
1666
0
    case tparquet::CompressionCodec::LZO:
1667
0
        *codec = LzoBlockCompression::instance();
1668
0
        break;
1669
0
    case tparquet::CompressionCodec::BROTLI:
1670
0
        *codec = BrotliBlockCompression::instance();
1671
0
        break;
1672
0
    default:
1673
0
        return Status::InternalError("unknown compression type({})", parquet_codec);
1674
1.23k
    }
1675
1676
1.23k
    return Status::OK();
1677
1.23k
}
1678
1679
} // namespace doris