Coverage Report

Created: 2025-09-16 21:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/util/block_compression.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "util/block_compression.h"
19
20
#include <bzlib.h>
21
#include <gen_cpp/parquet_types.h>
22
#include <gen_cpp/segment_v2.pb.h>
23
#include <glog/logging.h>
24
25
#include <exception>
26
// Only used on x86 or x86_64
27
#if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \
28
        defined(__i386) || defined(_M_IX86)
29
#include <libdeflate.h>
30
#endif
31
#include <brotli/decode.h>
32
#include <glog/log_severity.h>
33
#include <glog/logging.h>
34
#include <lz4/lz4.h>
35
#include <lz4/lz4frame.h>
36
#include <lz4/lz4hc.h>
37
#include <snappy/snappy-sinksource.h>
38
#include <snappy/snappy.h>
39
#include <zconf.h>
40
#include <zlib.h>
41
#include <zstd.h>
42
#include <zstd_errors.h>
43
44
#include <algorithm>
45
#include <cstdint>
46
#include <limits>
47
#include <mutex>
48
#include <orc/Exceptions.hh>
49
#include <ostream>
50
51
#include "absl/strings/substitute.h"
52
#include "common/config.h"
53
#include "common/factory_creator.h"
54
#include "exec/decompressor.h"
55
#include "runtime/thread_context.h"
56
#include "util/defer_op.h"
57
#include "util/faststring.h"
58
#include "vec/common/endian.h"
59
60
namespace orc {
61
/**
62
 * Decompress the bytes in to the output buffer.
63
 * @param inputAddress the start of the input
64
 * @param inputLimit one past the last byte of the input
65
 * @param outputAddress the start of the output buffer
66
 * @param outputLimit one past the last byte of the output buffer
67
 * @result the number of bytes decompressed
68
 */
69
uint64_t lzoDecompress(const char* inputAddress, const char* inputLimit, char* outputAddress,
70
                       char* outputLimit);
71
} // namespace orc
72
73
namespace doris {
74
#include "common/compile_check_begin.h"
75
76
// exception safe
77
Status BlockCompressionCodec::compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
78
142
                                       faststring* output) {
79
142
    faststring buf;
80
    // we compute total size to avoid more memory copy
81
142
    buf.reserve(uncompressed_size);
82
172
    for (auto& input : inputs) {
83
172
        buf.append(input.data, input.size);
84
172
    }
85
142
    return compress(buf, output);
86
142
}
87
88
22.9k
bool BlockCompressionCodec::exceed_max_compress_len(size_t uncompressed_size) {
89
22.9k
    return uncompressed_size > std::numeric_limits<int32_t>::max();
90
22.9k
}
91
92
class Lz4BlockCompression : public BlockCompressionCodec {
93
private:
94
    class Context {
95
        ENABLE_FACTORY_CREATOR(Context);
96
97
    public:
98
1
        Context() : ctx(nullptr) {
99
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
100
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
101
1
            buffer = std::make_unique<faststring>();
102
1
        }
103
        LZ4_stream_t* ctx;
104
        std::unique_ptr<faststring> buffer;
105
1
        ~Context() {
106
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
107
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
108
1
            if (ctx) {
109
1
                LZ4_freeStream(ctx);
110
1
            }
111
1
            buffer.reset();
112
1
        }
113
    };
114
115
public:
116
368
    static Lz4BlockCompression* instance() {
117
368
        static Lz4BlockCompression s_instance;
118
368
        return &s_instance;
119
368
    }
120
1
    ~Lz4BlockCompression() override {
121
1
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
122
1
                ExecEnv::GetInstance()->block_compression_mem_tracker());
123
1
        _ctx_pool.clear();
124
1
    }
125
126
190
    Status compress(const Slice& input, faststring* output) override {
127
190
        if (input.size > LZ4_MAX_INPUT_SIZE) {
128
0
            return Status::InvalidArgument(
129
0
                    "LZ4 not support those case(input.size>LZ4_MAX_INPUT_SIZE), maybe you should "
130
0
                    "change "
131
0
                    "fragment_transmission_compression_codec to snappy, input.size={}, "
132
0
                    "LZ4_MAX_INPUT_SIZE={}",
133
0
                    input.size, LZ4_MAX_INPUT_SIZE);
134
0
        }
135
136
190
        std::unique_ptr<Context> context;
137
190
        RETURN_IF_ERROR(_acquire_compression_ctx(context));
138
190
        bool compress_failed = false;
139
190
        Defer defer {[&] {
140
190
            if (!compress_failed) {
141
190
                _release_compression_ctx(std::move(context));
142
190
            }
143
190
        }};
144
145
190
        try {
146
190
            Slice compressed_buf;
147
190
            size_t max_len = max_compressed_len(input.size);
148
190
            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
149
                // use output directly
150
0
                output->resize(max_len);
151
0
                compressed_buf.data = reinterpret_cast<char*>(output->data());
152
0
                compressed_buf.size = max_len;
153
190
            } else {
154
                // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
155
190
                {
156
                    // context->buffer is resuable between queries, should accouting to
157
                    // global tracker.
158
190
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
159
190
                            ExecEnv::GetInstance()->block_compression_mem_tracker());
160
190
                    context->buffer->resize(max_len);
161
190
                }
162
190
                compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
163
190
                compressed_buf.size = max_len;
164
190
            }
165
166
            // input.size is aready checked before;
167
            // compressed_buf.size is got from max_compressed_len, which is
168
            // the return value of LZ4_compressBound, so it is safe to cast to int
169
190
            size_t compressed_len = LZ4_compress_fast_continue(
170
190
                    context->ctx, input.data, compressed_buf.data, static_cast<int>(input.size),
171
190
                    static_cast<int>(compressed_buf.size), ACCELARATION);
172
190
            if (compressed_len == 0) {
173
0
                compress_failed = true;
174
0
                return Status::InvalidArgument("Output buffer's capacity is not enough, size={}",
175
0
                                               compressed_buf.size);
176
0
            }
177
190
            output->resize(compressed_len);
178
190
            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
179
190
                output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data),
180
190
                                    compressed_len);
181
190
            }
182
190
        } catch (...) {
183
            // Do not set compress_failed to release context
184
0
            DCHECK(!compress_failed);
185
0
            return Status::InternalError("Fail to do LZ4Block compress due to exception");
186
0
        }
187
190
        return Status::OK();
188
190
    }
189
190
65
    Status decompress(const Slice& input, Slice* output) override {
191
65
        auto decompressed_len = LZ4_decompress_safe(
192
65
                input.data, output->data, cast_set<int>(input.size), cast_set<int>(output->size));
193
65
        if (decompressed_len < 0) {
194
5
            return Status::InternalError("fail to do LZ4 decompress, error={}", decompressed_len);
195
5
        }
196
60
        output->size = decompressed_len;
197
60
        return Status::OK();
198
65
    }
199
200
190
    size_t max_compressed_len(size_t len) override { return LZ4_compressBound(cast_set<int>(len)); }
201
202
private:
203
    // reuse LZ4 compress stream
204
190
    Status _acquire_compression_ctx(std::unique_ptr<Context>& out) {
205
190
        std::lock_guard<std::mutex> l(_ctx_mutex);
206
190
        if (_ctx_pool.empty()) {
207
1
            std::unique_ptr<Context> localCtx = Context::create_unique();
208
1
            if (localCtx.get() == nullptr) {
209
0
                return Status::InvalidArgument("new LZ4 context error");
210
0
            }
211
1
            localCtx->ctx = LZ4_createStream();
212
1
            if (localCtx->ctx == nullptr) {
213
0
                return Status::InvalidArgument("LZ4_createStream error");
214
0
            }
215
1
            out = std::move(localCtx);
216
1
            return Status::OK();
217
1
        }
218
189
        out = std::move(_ctx_pool.back());
219
189
        _ctx_pool.pop_back();
220
189
        return Status::OK();
221
190
    }
222
190
    void _release_compression_ctx(std::unique_ptr<Context> context) {
223
190
        DCHECK(context);
224
190
        LZ4_resetStream(context->ctx);
225
190
        std::lock_guard<std::mutex> l(_ctx_mutex);
226
190
        _ctx_pool.push_back(std::move(context));
227
190
    }
228
229
private:
230
    mutable std::mutex _ctx_mutex;
231
    mutable std::vector<std::unique_ptr<Context>> _ctx_pool;
232
    static const int32_t ACCELARATION = 1;
233
};
234
235
class HadoopLz4BlockCompression : public Lz4BlockCompression {
236
public:
237
0
    HadoopLz4BlockCompression() {
238
0
        Status st = Decompressor::create_decompressor(CompressType::LZ4BLOCK, &_decompressor);
239
0
        if (!st.ok()) {
240
0
            throw Exception(Status::FatalError(
241
0
                    "HadoopLz4BlockCompression construction failed. status = {}", st));
242
0
        }
243
0
    }
244
245
0
    ~HadoopLz4BlockCompression() override = default;
246
247
0
    static HadoopLz4BlockCompression* instance() {
248
0
        static HadoopLz4BlockCompression s_instance;
249
0
        return &s_instance;
250
0
    }
251
252
    // hadoop use block compression for lz4
253
    // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
254
0
    Status compress(const Slice& input, faststring* output) override {
255
        // be same with hadop https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java
256
0
        size_t lz4_block_size = config::lz4_compression_block_size;
257
0
        size_t overhead = lz4_block_size / 255 + 16;
258
0
        size_t max_input_size = lz4_block_size - overhead;
259
260
0
        size_t data_len = input.size;
261
0
        char* data = input.data;
262
0
        std::vector<OwnedSlice> buffers;
263
0
        size_t out_len = 0;
264
265
0
        while (data_len > 0) {
266
0
            size_t input_size = std::min(data_len, max_input_size);
267
0
            Slice input_slice(data, input_size);
268
0
            faststring output_data;
269
0
            RETURN_IF_ERROR(Lz4BlockCompression::compress(input_slice, &output_data));
270
0
            out_len += output_data.size();
271
0
            buffers.push_back(output_data.build());
272
0
            data += input_size;
273
0
            data_len -= input_size;
274
0
        }
275
276
        // hadoop block compression: umcompressed_length | compressed_length1 | compressed_data1 | compressed_length2 | compressed_data2 | ...
277
0
        size_t total_output_len = 4 + 4 * buffers.size() + out_len;
278
0
        output->resize(total_output_len);
279
0
        char* output_buffer = (char*)output->data();
280
0
        BigEndian::Store32(output_buffer, cast_set<uint32_t>(input.get_size()));
281
0
        output_buffer += 4;
282
0
        for (const auto& buffer : buffers) {
283
0
            auto slice = buffer.slice();
284
0
            BigEndian::Store32(output_buffer, cast_set<uint32_t>(slice.get_size()));
285
0
            output_buffer += 4;
286
0
            memcpy(output_buffer, slice.get_data(), slice.get_size());
287
0
            output_buffer += slice.get_size();
288
0
        }
289
290
0
        DCHECK_EQ(output_buffer - (char*)output->data(), total_output_len);
291
292
0
        return Status::OK();
293
0
    }
294
295
0
    Status decompress(const Slice& input, Slice* output) override {
296
0
        size_t input_bytes_read = 0;
297
0
        size_t decompressed_len = 0;
298
0
        size_t more_input_bytes = 0;
299
0
        size_t more_output_bytes = 0;
300
0
        bool stream_end = false;
301
0
        auto st = _decompressor->decompress((uint8_t*)input.data, cast_set<uint32_t>(input.size),
302
0
                                            &input_bytes_read, (uint8_t*)output->data,
303
0
                                            cast_set<uint32_t>(output->size), &decompressed_len,
304
0
                                            &stream_end, &more_input_bytes, &more_output_bytes);
305
        //try decompress use hadoopLz4 ,if failed fall back lz4.
306
0
        return (st != Status::OK() || stream_end != true)
307
0
                       ? Lz4BlockCompression::decompress(input, output)
308
0
                       : Status::OK();
309
0
    }
310
311
private:
312
    std::unique_ptr<Decompressor> _decompressor;
313
};
314
// Used for LZ4 frame format, decompress speed is two times faster than LZ4.
315
class Lz4fBlockCompression : public BlockCompressionCodec {
316
private:
317
    class CContext {
318
        ENABLE_FACTORY_CREATOR(CContext);
319
320
    public:
321
1
        CContext() : ctx(nullptr) {
322
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
323
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
324
1
            buffer = std::make_unique<faststring>();
325
1
        }
326
        LZ4F_compressionContext_t ctx;
327
        std::unique_ptr<faststring> buffer;
328
1
        ~CContext() {
329
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
330
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
331
1
            if (ctx) {
332
1
                LZ4F_freeCompressionContext(ctx);
333
1
            }
334
1
            buffer.reset();
335
1
        }
336
    };
337
    class DContext {
338
        ENABLE_FACTORY_CREATOR(DContext);
339
340
    public:
341
6
        DContext() : ctx(nullptr) {}
342
        LZ4F_decompressionContext_t ctx;
343
6
        ~DContext() {
344
6
            if (ctx) {
345
6
                LZ4F_freeDecompressionContext(ctx);
346
6
            }
347
6
        }
348
    };
349
350
public:
351
28.4k
    static Lz4fBlockCompression* instance() {
352
28.4k
        static Lz4fBlockCompression s_instance;
353
28.4k
        return &s_instance;
354
28.4k
    }
355
1
    ~Lz4fBlockCompression() {
356
1
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
357
1
                ExecEnv::GetInstance()->block_compression_mem_tracker());
358
1
        _ctx_c_pool.clear();
359
1
        _ctx_d_pool.clear();
360
1
    }
361
362
5
    Status compress(const Slice& input, faststring* output) override {
363
5
        std::vector<Slice> inputs {input};
364
5
        return compress(inputs, input.size, output);
365
5
    }
366
367
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
368
22.6k
                    faststring* output) override {
369
22.6k
        return _compress(inputs, uncompressed_size, output);
370
22.6k
    }
371
372
8.51k
    Status decompress(const Slice& input, Slice* output) override {
373
8.51k
        return _decompress(input, output);
374
8.51k
    }
375
376
22.6k
    size_t max_compressed_len(size_t len) override {
377
22.6k
        return std::max(LZ4F_compressBound(len, &_s_preferences),
378
22.6k
                        LZ4F_compressFrameBound(len, &_s_preferences));
379
22.6k
    }
380
381
private:
382
    Status _compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
383
22.6k
                     faststring* output) {
384
22.6k
        std::unique_ptr<CContext> context;
385
22.6k
        RETURN_IF_ERROR(_acquire_compression_ctx(context));
386
22.6k
        bool compress_failed = false;
387
22.6k
        Defer defer {[&] {
388
22.6k
            if (!compress_failed) {
389
22.6k
                _release_compression_ctx(std::move(context));
390
22.6k
            }
391
22.6k
        }};
392
393
22.6k
        try {
394
22.6k
            Slice compressed_buf;
395
22.6k
            size_t max_len = max_compressed_len(uncompressed_size);
396
22.6k
            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
397
                // use output directly
398
0
                output->resize(max_len);
399
0
                compressed_buf.data = reinterpret_cast<char*>(output->data());
400
0
                compressed_buf.size = max_len;
401
22.6k
            } else {
402
22.6k
                {
403
22.6k
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
404
22.6k
                            ExecEnv::GetInstance()->block_compression_mem_tracker());
405
                    // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
406
22.6k
                    context->buffer->resize(max_len);
407
22.6k
                }
408
22.6k
                compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
409
22.6k
                compressed_buf.size = max_len;
410
22.6k
            }
411
412
22.6k
            auto wbytes = LZ4F_compressBegin(context->ctx, compressed_buf.data, compressed_buf.size,
413
22.6k
                                             &_s_preferences);
414
22.6k
            if (LZ4F_isError(wbytes)) {
415
0
                compress_failed = true;
416
0
                return Status::InvalidArgument("Fail to do LZ4F compress begin, res={}",
417
0
                                               LZ4F_getErrorName(wbytes));
418
0
            }
419
22.6k
            size_t offset = wbytes;
420
23.1k
            for (auto input : inputs) {
421
23.1k
                wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data + offset,
422
23.1k
                                             compressed_buf.size - offset, input.data, input.size,
423
23.1k
                                             nullptr);
424
23.1k
                if (LZ4F_isError(wbytes)) {
425
0
                    compress_failed = true;
426
0
                    return Status::InvalidArgument("Fail to do LZ4F compress update, res={}",
427
0
                                                   LZ4F_getErrorName(wbytes));
428
0
                }
429
23.1k
                offset += wbytes;
430
23.1k
            }
431
22.6k
            wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + offset,
432
22.6k
                                      compressed_buf.size - offset, nullptr);
433
22.6k
            if (LZ4F_isError(wbytes)) {
434
0
                compress_failed = true;
435
0
                return Status::InvalidArgument("Fail to do LZ4F compress end, res={}",
436
0
                                               LZ4F_getErrorName(wbytes));
437
0
            }
438
22.6k
            offset += wbytes;
439
22.6k
            output->resize(offset);
440
22.6k
            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
441
22.6k
                output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), offset);
442
22.6k
            }
443
22.6k
        } catch (...) {
444
            // Do not set compress_failed to release context
445
0
            DCHECK(!compress_failed);
446
0
            return Status::InternalError("Fail to do LZ4F compress due to exception");
447
0
        }
448
449
22.6k
        return Status::OK();
450
22.6k
    }
451
452
8.51k
    Status _decompress(const Slice& input, Slice* output) {
453
8.51k
        bool decompress_failed = false;
454
8.51k
        std::unique_ptr<DContext> context;
455
8.51k
        RETURN_IF_ERROR(_acquire_decompression_ctx(context));
456
8.51k
        Defer defer {[&] {
457
8.51k
            if (!decompress_failed) {
458
8.51k
                _release_decompression_ctx(std::move(context));
459
8.51k
            }
460
8.51k
        }};
461
8.51k
        size_t input_size = input.size;
462
8.51k
        auto lres = LZ4F_decompress(context->ctx, output->data, &output->size, input.data,
463
8.51k
                                    &input_size, nullptr);
464
8.51k
        if (LZ4F_isError(lres)) {
465
0
            decompress_failed = true;
466
0
            return Status::InternalError("Fail to do LZ4F decompress, res={}",
467
0
                                         LZ4F_getErrorName(lres));
468
8.51k
        } else if (input_size != input.size) {
469
0
            decompress_failed = true;
470
0
            return Status::InvalidArgument(
471
0
                    absl::Substitute("Fail to do LZ4F decompress: trailing data left in "
472
0
                                     "compressed data, read=$0 vs given=$1",
473
0
                                     input_size, input.size));
474
8.51k
        } else if (lres != 0) {
475
5
            decompress_failed = true;
476
5
            return Status::InvalidArgument(
477
5
                    "Fail to do LZ4F decompress: expect more compressed data, expect={}", lres);
478
5
        }
479
8.51k
        return Status::OK();
480
8.51k
    }
481
482
private:
483
    // acquire a compression ctx from pool, release while finish compress,
484
    // delete if compression failed
485
22.6k
    Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) {
486
22.6k
        std::lock_guard<std::mutex> l(_ctx_c_mutex);
487
22.6k
        if (_ctx_c_pool.empty()) {
488
1
            std::unique_ptr<CContext> localCtx = CContext::create_unique();
489
1
            if (localCtx.get() == nullptr) {
490
0
                return Status::InvalidArgument("failed to new LZ4F CContext");
491
0
            }
492
1
            auto res = LZ4F_createCompressionContext(&localCtx->ctx, LZ4F_VERSION);
493
1
            if (LZ4F_isError(res) != 0) {
494
0
                return Status::InvalidArgument(absl::Substitute(
495
0
                        "LZ4F_createCompressionContext error, res=$0", LZ4F_getErrorName(res)));
496
0
            }
497
1
            out = std::move(localCtx);
498
1
            return Status::OK();
499
1
        }
500
22.6k
        out = std::move(_ctx_c_pool.back());
501
22.6k
        _ctx_c_pool.pop_back();
502
22.6k
        return Status::OK();
503
22.6k
    }
504
22.6k
    void _release_compression_ctx(std::unique_ptr<CContext> context) {
505
22.6k
        DCHECK(context);
506
22.6k
        std::lock_guard<std::mutex> l(_ctx_c_mutex);
507
22.6k
        _ctx_c_pool.push_back(std::move(context));
508
22.6k
    }
509
510
8.51k
    Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) {
511
8.51k
        std::lock_guard<std::mutex> l(_ctx_d_mutex);
512
8.51k
        if (_ctx_d_pool.empty()) {
513
6
            std::unique_ptr<DContext> localCtx = DContext::create_unique();
514
6
            if (localCtx.get() == nullptr) {
515
0
                return Status::InvalidArgument("failed to new LZ4F DContext");
516
0
            }
517
6
            auto res = LZ4F_createDecompressionContext(&localCtx->ctx, LZ4F_VERSION);
518
6
            if (LZ4F_isError(res) != 0) {
519
0
                return Status::InvalidArgument(absl::Substitute(
520
0
                        "LZ4F_createDeompressionContext error, res=$0", LZ4F_getErrorName(res)));
521
0
            }
522
6
            out = std::move(localCtx);
523
6
            return Status::OK();
524
6
        }
525
8.51k
        out = std::move(_ctx_d_pool.back());
526
8.51k
        _ctx_d_pool.pop_back();
527
8.51k
        return Status::OK();
528
8.51k
    }
529
8.51k
    void _release_decompression_ctx(std::unique_ptr<DContext> context) {
530
8.51k
        DCHECK(context);
531
        // reset decompression context to avoid ERROR_maxBlockSize_invalid
532
8.51k
        LZ4F_resetDecompressionContext(context->ctx);
533
8.51k
        std::lock_guard<std::mutex> l(_ctx_d_mutex);
534
8.51k
        _ctx_d_pool.push_back(std::move(context));
535
8.51k
    }
536
537
private:
538
    static LZ4F_preferences_t _s_preferences;
539
540
    std::mutex _ctx_c_mutex;
541
    // LZ4F_compressionContext_t is a pointer so no copy here
542
    std::vector<std::unique_ptr<CContext>> _ctx_c_pool;
543
544
    std::mutex _ctx_d_mutex;
545
    std::vector<std::unique_ptr<DContext>> _ctx_d_pool;
546
};
547
548
LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = {
549
        {LZ4F_max256KB, LZ4F_blockLinked, LZ4F_noContentChecksum, LZ4F_frame, 0ULL, 0U,
550
         LZ4F_noBlockChecksum},
551
        0,
552
        0u,
553
        0u,
554
        {0u, 0u, 0u}};
555
556
class Lz4HCBlockCompression : public BlockCompressionCodec {
557
private:
558
    class Context {
559
        ENABLE_FACTORY_CREATOR(Context);
560
561
    public:
562
1
        Context() : ctx(nullptr) {
563
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
564
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
565
1
            buffer = std::make_unique<faststring>();
566
1
        }
567
        LZ4_streamHC_t* ctx;
568
        std::unique_ptr<faststring> buffer;
569
1
        ~Context() {
570
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
571
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
572
1
            if (ctx) {
573
1
                LZ4_freeStreamHC(ctx);
574
1
            }
575
1
            buffer.reset();
576
1
        }
577
    };
578
579
public:
580
2
    static Lz4HCBlockCompression* instance() {
581
2
        static Lz4HCBlockCompression s_instance;
582
2
        return &s_instance;
583
2
    }
584
1
    ~Lz4HCBlockCompression() {
585
1
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
586
1
                ExecEnv::GetInstance()->block_compression_mem_tracker());
587
1
        _ctx_pool.clear();
588
1
    }
589
590
6
    Status compress(const Slice& input, faststring* output) override {
591
6
        std::unique_ptr<Context> context;
592
6
        RETURN_IF_ERROR(_acquire_compression_ctx(context));
593
6
        bool compress_failed = false;
594
6
        Defer defer {[&] {
595
6
            if (!compress_failed) {
596
6
                _release_compression_ctx(std::move(context));
597
6
            }
598
6
        }};
599
600
6
        try {
601
6
            Slice compressed_buf;
602
6
            size_t max_len = max_compressed_len(input.size);
603
6
            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
604
                // use output directly
605
0
                output->resize(max_len);
606
0
                compressed_buf.data = reinterpret_cast<char*>(output->data());
607
0
                compressed_buf.size = max_len;
608
6
            } else {
609
6
                {
610
6
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
611
6
                            ExecEnv::GetInstance()->block_compression_mem_tracker());
612
                    // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
613
6
                    context->buffer->resize(max_len);
614
6
                }
615
6
                compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
616
6
                compressed_buf.size = max_len;
617
6
            }
618
619
6
            size_t compressed_len = LZ4_compress_HC_continue(
620
6
                    context->ctx, input.data, compressed_buf.data, cast_set<int>(input.size),
621
6
                    static_cast<int>(compressed_buf.size));
622
6
            if (compressed_len == 0) {
623
0
                compress_failed = true;
624
0
                return Status::InvalidArgument("Output buffer's capacity is not enough, size={}",
625
0
                                               compressed_buf.size);
626
0
            }
627
6
            output->resize(compressed_len);
628
6
            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
629
6
                output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data),
630
6
                                    compressed_len);
631
6
            }
632
6
        } catch (...) {
633
            // Do not set compress_failed to release context
634
0
            DCHECK(!compress_failed);
635
0
            return Status::InternalError("Fail to do LZ4HC compress due to exception");
636
0
        }
637
6
        return Status::OK();
638
6
    }
639
640
11
    Status decompress(const Slice& input, Slice* output) override {
641
11
        auto decompressed_len = LZ4_decompress_safe(
642
11
                input.data, output->data, cast_set<int>(input.size), cast_set<int>(output->size));
643
11
        if (decompressed_len < 0) {
644
5
            return Status::InvalidArgument(
645
5
                    "destination buffer is not large enough or the source stream is detected "
646
5
                    "malformed, fail to do LZ4 decompress, error={}",
647
5
                    decompressed_len);
648
5
        }
649
6
        output->size = decompressed_len;
650
6
        return Status::OK();
651
11
    }
652
653
6
    size_t max_compressed_len(size_t len) override { return LZ4_compressBound(cast_set<int>(len)); }
654
655
private:
656
6
    Status _acquire_compression_ctx(std::unique_ptr<Context>& out) {
657
6
        std::lock_guard<std::mutex> l(_ctx_mutex);
658
6
        if (_ctx_pool.empty()) {
659
1
            std::unique_ptr<Context> localCtx = Context::create_unique();
660
1
            if (localCtx.get() == nullptr) {
661
0
                return Status::InvalidArgument("new LZ4HC context error");
662
0
            }
663
1
            localCtx->ctx = LZ4_createStreamHC();
664
1
            if (localCtx->ctx == nullptr) {
665
0
                return Status::InvalidArgument("LZ4_createStreamHC error");
666
0
            }
667
1
            out = std::move(localCtx);
668
1
            return Status::OK();
669
1
        }
670
5
        out = std::move(_ctx_pool.back());
671
5
        _ctx_pool.pop_back();
672
5
        return Status::OK();
673
6
    }
674
6
    void _release_compression_ctx(std::unique_ptr<Context> context) {
675
6
        DCHECK(context);
676
6
        LZ4_resetStreamHC_fast(context->ctx, static_cast<int>(_compression_level));
677
6
        std::lock_guard<std::mutex> l(_ctx_mutex);
678
6
        _ctx_pool.push_back(std::move(context));
679
6
    }
680
681
private:
682
    int64_t _compression_level = config::LZ4_HC_compression_level;
683
    mutable std::mutex _ctx_mutex;
684
    mutable std::vector<std::unique_ptr<Context>> _ctx_pool;
685
};
686
687
class SnappySlicesSource : public snappy::Source {
688
public:
689
    SnappySlicesSource(const std::vector<Slice>& slices)
690
1
            : _available(0), _cur_slice(0), _slice_off(0) {
691
5
        for (auto& slice : slices) {
692
            // We filter empty slice here to avoid complicated process
693
5
            if (slice.size == 0) {
694
1
                continue;
695
1
            }
696
4
            _available += slice.size;
697
4
            _slices.push_back(slice);
698
4
        }
699
1
    }
700
1
    ~SnappySlicesSource() override {}
701
702
    // Return the number of bytes left to read from the source
703
1
    size_t Available() const override { return _available; }
704
705
    // Peek at the next flat region of the source.  Does not reposition
706
    // the source.  The returned region is empty iff Available()==0.
707
    //
708
    // Returns a pointer to the beginning of the region and store its
709
    // length in *len.
710
    //
711
    // The returned region is valid until the next call to Skip() or
712
    // until this object is destroyed, whichever occurs first.
713
    //
714
    // The returned region may be larger than Available() (for example
715
    // if this ByteSource is a view on a substring of a larger source).
716
    // The caller is responsible for ensuring that it only reads the
717
    // Available() bytes.
718
19
    const char* Peek(size_t* len) override {
719
19
        if (_available == 0) {
720
0
            *len = 0;
721
0
            return nullptr;
722
0
        }
723
        // we should assure that *len is not 0
724
19
        *len = _slices[_cur_slice].size - _slice_off;
725
19
        DCHECK(*len != 0);
726
19
        return _slices[_cur_slice].data + _slice_off;
727
19
    }
728
729
    // Skip the next n bytes.  Invalidates any buffer returned by
730
    // a previous call to Peek().
731
    // REQUIRES: Available() >= n
732
20
    void Skip(size_t n) override {
733
20
        _available -= n;
734
24
        while (n > 0) {
735
19
            auto left = _slices[_cur_slice].size - _slice_off;
736
19
            if (left > n) {
737
                // n can be digest in current slice
738
15
                _slice_off += n;
739
15
                return;
740
15
            }
741
4
            _slice_off = 0;
742
4
            _cur_slice++;
743
4
            n -= left;
744
4
        }
745
20
    }
746
747
private:
748
    std::vector<Slice> _slices;
749
    size_t _available;
750
    size_t _cur_slice;
751
    size_t _slice_off;
752
};
753
754
class SnappyBlockCompression : public BlockCompressionCodec {
755
public:
756
155
    static SnappyBlockCompression* instance() {
757
155
        static SnappyBlockCompression s_instance;
758
155
        return &s_instance;
759
155
    }
760
1
    ~SnappyBlockCompression() override = default;
761
762
34
    Status compress(const Slice& input, faststring* output) override {
763
34
        size_t max_len = max_compressed_len(input.size);
764
34
        output->resize(max_len);
765
34
        Slice s(*output);
766
767
34
        snappy::RawCompress(input.data, input.size, s.data, &s.size);
768
34
        output->resize(s.size);
769
34
        return Status::OK();
770
34
    }
771
772
145
    Status decompress(const Slice& input, Slice* output) override {
773
145
        if (!snappy::RawUncompress(input.data, input.size, output->data)) {
774
0
            return Status::InvalidArgument("Fail to do Snappy decompress");
775
0
        }
776
        // NOTE: GetUncompressedLength only takes O(1) time
777
145
        snappy::GetUncompressedLength(input.data, input.size, &output->size);
778
145
        return Status::OK();
779
145
    }
780
781
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
782
1
                    faststring* output) override {
783
1
        auto max_len = max_compressed_len(uncompressed_size);
784
1
        output->resize(max_len);
785
786
1
        SnappySlicesSource source(inputs);
787
1
        snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(output->data()));
788
1
        output->resize(snappy::Compress(&source, &sink));
789
1
        return Status::OK();
790
1
    }
791
792
35
    size_t max_compressed_len(size_t len) override { return snappy::MaxCompressedLength(len); }
793
};
794
795
class HadoopSnappyBlockCompression : public SnappyBlockCompression {
796
public:
797
0
    static HadoopSnappyBlockCompression* instance() {
798
0
        static HadoopSnappyBlockCompression s_instance;
799
0
        return &s_instance;
800
0
    }
801
0
    ~HadoopSnappyBlockCompression() override = default;
802
803
    // hadoop use block compression for snappy
804
    // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc
805
0
    Status compress(const Slice& input, faststring* output) override {
806
        // be same with hadop https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java
807
0
        size_t snappy_block_size = config::snappy_compression_block_size;
808
0
        size_t overhead = snappy_block_size / 6 + 32;
809
0
        size_t max_input_size = snappy_block_size - overhead;
810
811
0
        size_t data_len = input.size;
812
0
        char* data = input.data;
813
0
        std::vector<OwnedSlice> buffers;
814
0
        size_t out_len = 0;
815
816
0
        while (data_len > 0) {
817
0
            size_t input_size = std::min(data_len, max_input_size);
818
0
            Slice input_slice(data, input_size);
819
0
            faststring output_data;
820
0
            RETURN_IF_ERROR(SnappyBlockCompression::compress(input_slice, &output_data));
821
0
            out_len += output_data.size();
822
            // the OwnedSlice will be moved here
823
0
            buffers.push_back(output_data.build());
824
0
            data += input_size;
825
0
            data_len -= input_size;
826
0
        }
827
828
        // hadoop block compression: umcompressed_length | compressed_length1 | compressed_data1 | compressed_length2 | compressed_data2 | ...
829
0
        size_t total_output_len = 4 + 4 * buffers.size() + out_len;
830
0
        output->resize(total_output_len);
831
0
        char* output_buffer = (char*)output->data();
832
0
        BigEndian::Store32(output_buffer, cast_set<uint32_t>(input.get_size()));
833
0
        output_buffer += 4;
834
0
        for (const auto& buffer : buffers) {
835
0
            auto slice = buffer.slice();
836
0
            BigEndian::Store32(output_buffer, cast_set<uint32_t>(slice.get_size()));
837
0
            output_buffer += 4;
838
0
            memcpy(output_buffer, slice.get_data(), slice.get_size());
839
0
            output_buffer += slice.get_size();
840
0
        }
841
842
0
        DCHECK_EQ(output_buffer - (char*)output->data(), total_output_len);
843
844
0
        return Status::OK();
845
0
    }
846
847
0
    Status decompress(const Slice& input, Slice* output) override {
848
0
        return Status::InternalError("unimplement: SnappyHadoopBlockCompression::decompress");
849
0
    }
850
};
851
852
class ZlibBlockCompression : public BlockCompressionCodec {
853
public:
854
2
    static ZlibBlockCompression* instance() {
855
2
        static ZlibBlockCompression s_instance;
856
2
        return &s_instance;
857
2
    }
858
1
    ~ZlibBlockCompression() override = default;
859
860
5
    Status compress(const Slice& input, faststring* output) override {
861
5
        size_t max_len = max_compressed_len(input.size);
862
5
        output->resize(max_len);
863
5
        Slice s(*output);
864
865
5
        auto zres = ::compress((Bytef*)s.data, &s.size, (Bytef*)input.data, input.size);
866
5
        if (zres == Z_MEM_ERROR) {
867
0
            throw Exception(Status::MemoryLimitExceeded(fmt::format(
868
0
                    "ZLib compression failed due to memory allocationerror.error = {}, res = {} ",
869
0
                    zError(zres), zres)));
870
5
        } else if (zres != Z_OK) {
871
0
            return Status::InternalError("Fail to do Zlib compress, error={}", zError(zres));
872
0
        }
873
5
        output->resize(s.size);
874
5
        return Status::OK();
875
5
    }
876
877
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
878
1
                    faststring* output) override {
879
1
        size_t max_len = max_compressed_len(uncompressed_size);
880
1
        output->resize(max_len);
881
882
1
        z_stream zstrm;
883
1
        zstrm.zalloc = Z_NULL;
884
1
        zstrm.zfree = Z_NULL;
885
1
        zstrm.opaque = Z_NULL;
886
1
        auto zres = deflateInit(&zstrm, Z_DEFAULT_COMPRESSION);
887
1
        if (zres == Z_MEM_ERROR) {
888
0
            throw Exception(Status::MemoryLimitExceeded(
889
0
                    "Fail to do ZLib stream compress, error={}, res={}", zError(zres), zres));
890
1
        } else if (zres != Z_OK) {
891
0
            return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
892
0
                                         zError(zres), zres);
893
0
        }
894
        // we assume that output is e
895
1
        zstrm.next_out = (Bytef*)output->data();
896
1
        zstrm.avail_out = cast_set<decltype(zstrm.avail_out)>(output->size());
897
6
        for (int i = 0; i < inputs.size(); ++i) {
898
5
            if (inputs[i].size == 0) {
899
1
                continue;
900
1
            }
901
4
            zstrm.next_in = (Bytef*)inputs[i].data;
902
4
            zstrm.avail_in = cast_set<decltype(zstrm.avail_in)>(inputs[i].size);
903
4
            int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH;
904
905
4
            zres = deflate(&zstrm, flush);
906
4
            if (zres != Z_OK && zres != Z_STREAM_END) {
907
0
                return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
908
0
                                             zError(zres), zres);
909
0
            }
910
4
        }
911
912
1
        output->resize(zstrm.total_out);
913
1
        zres = deflateEnd(&zstrm);
914
1
        if (zres == Z_DATA_ERROR) {
915
0
            return Status::InvalidArgument("Fail to do deflateEnd, error={}, res={}", zError(zres),
916
0
                                           zres);
917
1
        } else if (zres != Z_OK) {
918
0
            return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}",
919
0
                                         zError(zres), zres);
920
0
        }
921
1
        return Status::OK();
922
1
    }
923
924
14
    Status decompress(const Slice& input, Slice* output) override {
925
14
        size_t input_size = input.size;
926
14
        auto zres =
927
14
                ::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size);
928
14
        if (zres == Z_DATA_ERROR) {
929
1
            return Status::InvalidArgument("Fail to do ZLib decompress, error={}", zError(zres));
930
13
        } else if (zres == Z_MEM_ERROR) {
931
0
            throw Exception(Status::MemoryLimitExceeded("Fail to do ZLib decompress, error={}",
932
0
                                                        zError(zres)));
933
13
        } else if (zres != Z_OK) {
934
7
            return Status::InternalError("Fail to do ZLib decompress, error={}", zError(zres));
935
7
        }
936
6
        return Status::OK();
937
14
    }
938
939
6
    size_t max_compressed_len(size_t len) override {
940
        // one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block
941
6
        return len + 6 + 5 * ((len >> 14) + 1);
942
6
    }
943
};
944
945
class Bzip2BlockCompression : public BlockCompressionCodec {
946
public:
947
0
    static Bzip2BlockCompression* instance() {
948
0
        static Bzip2BlockCompression s_instance;
949
0
        return &s_instance;
950
0
    }
951
0
    ~Bzip2BlockCompression() override = default;
952
953
0
    Status compress(const Slice& input, faststring* output) override {
954
0
        size_t max_len = max_compressed_len(input.size);
955
0
        output->resize(max_len);
956
0
        auto size = cast_set<uint32_t>(output->size());
957
0
        auto bzres = BZ2_bzBuffToBuffCompress((char*)output->data(), &size, (char*)input.data,
958
0
                                              cast_set<uint32_t>(input.size), 9, 0, 0);
959
0
        if (bzres == BZ_MEM_ERROR) {
960
0
            throw Exception(
961
0
                    Status::MemoryLimitExceeded("Fail to do Bzip2 compress, ret={}", bzres));
962
0
        } else if (bzres == BZ_PARAM_ERROR) {
963
0
            return Status::InvalidArgument("Fail to do Bzip2 compress, ret={}", bzres);
964
0
        } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK &&
965
0
                   bzres != BZ_STREAM_END && bzres != BZ_OK) {
966
0
            return Status::InternalError("Failed to init bz2. status code: {}", bzres);
967
0
        }
968
0
        output->resize(size);
969
0
        return Status::OK();
970
0
    }
971
972
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
973
0
                    faststring* output) override {
974
0
        size_t max_len = max_compressed_len(uncompressed_size);
975
0
        output->resize(max_len);
976
977
0
        bz_stream bzstrm;
978
0
        bzero(&bzstrm, sizeof(bzstrm));
979
0
        int bzres = BZ2_bzCompressInit(&bzstrm, 9, 0, 0);
980
0
        if (bzres == BZ_PARAM_ERROR) {
981
0
            return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres);
982
0
        } else if (bzres == BZ_MEM_ERROR) {
983
0
            throw Exception(
984
0
                    Status::MemoryLimitExceeded("Failed to init bz2. status code: {}", bzres));
985
0
        } else if (bzres != BZ_OK) {
986
0
            return Status::InternalError("Failed to init bz2. status code: {}", bzres);
987
0
        }
988
        // we assume that output is e
989
0
        bzstrm.next_out = (char*)output->data();
990
0
        bzstrm.avail_out = cast_set<uint32_t>(output->size());
991
0
        for (int i = 0; i < inputs.size(); ++i) {
992
0
            if (inputs[i].size == 0) {
993
0
                continue;
994
0
            }
995
0
            bzstrm.next_in = (char*)inputs[i].data;
996
0
            bzstrm.avail_in = cast_set<uint32_t>(inputs[i].size);
997
0
            int flush = (i == (inputs.size() - 1)) ? BZ_FINISH : BZ_RUN;
998
999
0
            bzres = BZ2_bzCompress(&bzstrm, flush);
1000
0
            if (bzres == BZ_PARAM_ERROR) {
1001
0
                return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres);
1002
0
            } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK &&
1003
0
                       bzres != BZ_STREAM_END && bzres != BZ_OK) {
1004
0
                return Status::InternalError("Failed to init bz2. status code: {}", bzres);
1005
0
            }
1006
0
        }
1007
1008
0
        size_t total_out = (size_t)bzstrm.total_out_hi32 << 32 | (size_t)bzstrm.total_out_lo32;
1009
0
        output->resize(total_out);
1010
0
        bzres = BZ2_bzCompressEnd(&bzstrm);
1011
0
        if (bzres == BZ_PARAM_ERROR) {
1012
0
            return Status::InvalidArgument("Fail to do deflateEnd on bzip2 stream, res={}", bzres);
1013
0
        } else if (bzres != BZ_OK) {
1014
0
            return Status::InternalError("Fail to do deflateEnd on bzip2 stream, res={}", bzres);
1015
0
        }
1016
0
        return Status::OK();
1017
0
    }
1018
1019
0
    Status decompress(const Slice& input, Slice* output) override {
1020
0
        return Status::InternalError("unimplement: Bzip2BlockCompression::decompress");
1021
0
    }
1022
1023
0
    size_t max_compressed_len(size_t len) override {
1024
        // TODO: make sure the max_compressed_len for bzip2
1025
0
        return len * 2;
1026
0
    }
1027
};
1028
1029
// for ZSTD compression and decompression, with BOTH fast and high compression ratio
1030
class ZstdBlockCompression : public BlockCompressionCodec {
1031
private:
1032
    class CContext {
1033
        ENABLE_FACTORY_CREATOR(CContext);
1034
1035
    public:
1036
1
        CContext() : ctx(nullptr) {
1037
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
1038
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
1039
1
            buffer = std::make_unique<faststring>();
1040
1
        }
1041
        ZSTD_CCtx* ctx;
1042
        std::unique_ptr<faststring> buffer;
1043
1
        ~CContext() {
1044
1
            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
1045
1
                    ExecEnv::GetInstance()->block_compression_mem_tracker());
1046
1
            if (ctx) {
1047
1
                ZSTD_freeCCtx(ctx);
1048
1
            }
1049
1
            buffer.reset();
1050
1
        }
1051
    };
1052
    class DContext {
1053
        ENABLE_FACTORY_CREATOR(DContext);
1054
1055
    public:
1056
6
        DContext() : ctx(nullptr) {}
1057
        ZSTD_DCtx* ctx;
1058
6
        ~DContext() {
1059
6
            if (ctx) {
1060
6
                ZSTD_freeDCtx(ctx);
1061
6
            }
1062
6
        }
1063
    };
1064
1065
public:
1066
504
    static ZstdBlockCompression* instance() {
1067
504
        static ZstdBlockCompression s_instance;
1068
504
        return &s_instance;
1069
504
    }
1070
1
    ~ZstdBlockCompression() {
1071
1
        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
1072
1
                ExecEnv::GetInstance()->block_compression_mem_tracker());
1073
1
        _ctx_c_pool.clear();
1074
1
        _ctx_d_pool.clear();
1075
1
    }
1076
1077
327
    size_t max_compressed_len(size_t len) override { return ZSTD_compressBound(len); }
1078
1079
177
    Status compress(const Slice& input, faststring* output) override {
1080
177
        std::vector<Slice> inputs {input};
1081
177
        return compress(inputs, input.size, output);
1082
177
    }
1083
1084
    // follow ZSTD official example
1085
    //  https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c
1086
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
1087
327
                    faststring* output) override {
1088
327
        std::unique_ptr<CContext> context;
1089
327
        RETURN_IF_ERROR(_acquire_compression_ctx(context));
1090
327
        bool compress_failed = false;
1091
327
        Defer defer {[&] {
1092
327
            if (!compress_failed) {
1093
327
                _release_compression_ctx(std::move(context));
1094
327
            }
1095
327
        }};
1096
1097
327
        try {
1098
327
            size_t max_len = max_compressed_len(uncompressed_size);
1099
327
            Slice compressed_buf;
1100
327
            if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
1101
                // use output directly
1102
4
                output->resize(max_len);
1103
4
                compressed_buf.data = reinterpret_cast<char*>(output->data());
1104
4
                compressed_buf.size = max_len;
1105
323
            } else {
1106
323
                {
1107
323
                    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
1108
323
                            ExecEnv::GetInstance()->block_compression_mem_tracker());
1109
                    // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE
1110
323
                    context->buffer->resize(max_len);
1111
323
                }
1112
323
                compressed_buf.data = reinterpret_cast<char*>(context->buffer->data());
1113
323
                compressed_buf.size = max_len;
1114
323
            }
1115
1116
            // set compression level to default 3
1117
327
            auto ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_compressionLevel,
1118
327
                                              ZSTD_CLEVEL_DEFAULT);
1119
327
            if (ZSTD_isError(ret)) {
1120
0
                return Status::InvalidArgument("ZSTD_CCtx_setParameter compression level error: {}",
1121
0
                                               ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
1122
0
            }
1123
            // set checksum flag to 1
1124
327
            ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1);
1125
327
            if (ZSTD_isError(ret)) {
1126
0
                return Status::InvalidArgument("ZSTD_CCtx_setParameter checksumFlag error: {}",
1127
0
                                               ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
1128
0
            }
1129
1130
327
            ZSTD_outBuffer out_buf = {compressed_buf.data, compressed_buf.size, 0};
1131
1132
658
            for (size_t i = 0; i < inputs.size(); i++) {
1133
331
                ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0};
1134
1135
331
                bool last_input = (i == inputs.size() - 1);
1136
331
                auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue;
1137
1138
331
                bool finished = false;
1139
331
                do {
1140
                    // do compress
1141
331
                    ret = ZSTD_compressStream2(context->ctx, &out_buf, &in_buf, mode);
1142
1143
331
                    if (ZSTD_isError(ret)) {
1144
0
                        compress_failed = true;
1145
0
                        return Status::InternalError("ZSTD_compressStream2 error: {}",
1146
0
                                                     ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
1147
0
                    }
1148
1149
                    // ret is ZSTD hint for needed output buffer size
1150
331
                    if (ret > 0 && out_buf.pos == out_buf.size) {
1151
0
                        compress_failed = true;
1152
0
                        return Status::InternalError("ZSTD_compressStream2 output buffer full");
1153
0
                    }
1154
1155
331
                    finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size);
1156
331
                } while (!finished);
1157
331
            }
1158
1159
            // set compressed size for caller
1160
327
            output->resize(out_buf.pos);
1161
327
            if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) {
1162
323
                output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), out_buf.pos);
1163
323
            }
1164
327
        } catch (std::exception& e) {
1165
0
            return Status::InternalError("Fail to do ZSTD compress due to exception {}", e.what());
1166
0
        } catch (...) {
1167
            // Do not set compress_failed to release context
1168
0
            DCHECK(!compress_failed);
1169
0
            return Status::InternalError("Fail to do ZSTD compress due to exception");
1170
0
        }
1171
1172
327
        return Status::OK();
1173
327
    }
1174
1175
281
    Status decompress(const Slice& input, Slice* output) override {
1176
281
        std::unique_ptr<DContext> context;
1177
281
        bool decompress_failed = false;
1178
281
        RETURN_IF_ERROR(_acquire_decompression_ctx(context));
1179
281
        Defer defer {[&] {
1180
281
            if (!decompress_failed) {
1181
276
                _release_decompression_ctx(std::move(context));
1182
276
            }
1183
281
        }};
1184
1185
281
        size_t ret = ZSTD_decompressDCtx(context->ctx, output->data, output->size, input.data,
1186
281
                                         input.size);
1187
281
        if (ZSTD_isError(ret)) {
1188
5
            decompress_failed = true;
1189
5
            return Status::InternalError("ZSTD_decompressDCtx error: {}",
1190
5
                                         ZSTD_getErrorString(ZSTD_getErrorCode(ret)));
1191
5
        }
1192
1193
        // set decompressed size for caller
1194
276
        output->size = ret;
1195
1196
276
        return Status::OK();
1197
281
    }
1198
1199
private:
1200
327
    Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) {
1201
327
        std::lock_guard<std::mutex> l(_ctx_c_mutex);
1202
327
        if (_ctx_c_pool.empty()) {
1203
1
            std::unique_ptr<CContext> localCtx = CContext::create_unique();
1204
1
            if (localCtx.get() == nullptr) {
1205
0
                return Status::InvalidArgument("failed to new ZSTD CContext");
1206
0
            }
1207
            //typedef LZ4F_cctx* LZ4F_compressionContext_t;
1208
1
            localCtx->ctx = ZSTD_createCCtx();
1209
1
            if (localCtx->ctx == nullptr) {
1210
0
                return Status::InvalidArgument("Failed to create ZSTD compress ctx");
1211
0
            }
1212
1
            out = std::move(localCtx);
1213
1
            return Status::OK();
1214
1
        }
1215
326
        out = std::move(_ctx_c_pool.back());
1216
326
        _ctx_c_pool.pop_back();
1217
326
        return Status::OK();
1218
327
    }
1219
327
    void _release_compression_ctx(std::unique_ptr<CContext> context) {
1220
327
        DCHECK(context);
1221
327
        auto ret = ZSTD_CCtx_reset(context->ctx, ZSTD_reset_session_only);
1222
327
        DCHECK(!ZSTD_isError(ret));
1223
327
        std::lock_guard<std::mutex> l(_ctx_c_mutex);
1224
327
        _ctx_c_pool.push_back(std::move(context));
1225
327
    }
1226
1227
281
    Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) {
1228
281
        std::lock_guard<std::mutex> l(_ctx_d_mutex);
1229
281
        if (_ctx_d_pool.empty()) {
1230
6
            std::unique_ptr<DContext> localCtx = DContext::create_unique();
1231
6
            if (localCtx.get() == nullptr) {
1232
0
                return Status::InvalidArgument("failed to new ZSTD DContext");
1233
0
            }
1234
6
            localCtx->ctx = ZSTD_createDCtx();
1235
6
            if (localCtx->ctx == nullptr) {
1236
0
                return Status::InvalidArgument("Fail to init ZSTD decompress context");
1237
0
            }
1238
6
            out = std::move(localCtx);
1239
6
            return Status::OK();
1240
6
        }
1241
275
        out = std::move(_ctx_d_pool.back());
1242
275
        _ctx_d_pool.pop_back();
1243
275
        return Status::OK();
1244
281
    }
1245
276
    void _release_decompression_ctx(std::unique_ptr<DContext> context) {
1246
276
        DCHECK(context);
1247
        // reset ctx to start a new decompress session
1248
276
        auto ret = ZSTD_DCtx_reset(context->ctx, ZSTD_reset_session_only);
1249
276
        DCHECK(!ZSTD_isError(ret));
1250
276
        std::lock_guard<std::mutex> l(_ctx_d_mutex);
1251
276
        _ctx_d_pool.push_back(std::move(context));
1252
276
    }
1253
1254
private:
1255
    mutable std::mutex _ctx_c_mutex;
1256
    mutable std::vector<std::unique_ptr<CContext>> _ctx_c_pool;
1257
1258
    mutable std::mutex _ctx_d_mutex;
1259
    mutable std::vector<std::unique_ptr<DContext>> _ctx_d_pool;
1260
};
1261
1262
class GzipBlockCompression : public ZlibBlockCompression {
1263
public:
1264
0
    static GzipBlockCompression* instance() {
1265
0
        static GzipBlockCompression s_instance;
1266
0
        return &s_instance;
1267
0
    }
1268
0
    ~GzipBlockCompression() override = default;
1269
1270
0
    Status compress(const Slice& input, faststring* output) override {
1271
0
        size_t max_len = max_compressed_len(input.size);
1272
0
        output->resize(max_len);
1273
1274
0
        z_stream z_strm = {};
1275
0
        z_strm.zalloc = Z_NULL;
1276
0
        z_strm.zfree = Z_NULL;
1277
0
        z_strm.opaque = Z_NULL;
1278
1279
0
        int zres = deflateInit2(&z_strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC,
1280
0
                                8, Z_DEFAULT_STRATEGY);
1281
1282
0
        if (zres == Z_MEM_ERROR) {
1283
0
            throw Exception(Status::MemoryLimitExceeded(
1284
0
                    "Fail to init ZLib compress, error={}, res={}", zError(zres), zres));
1285
0
        } else if (zres != Z_OK) {
1286
0
            return Status::InternalError("Fail to init ZLib compress, error={}, res={}",
1287
0
                                         zError(zres), zres);
1288
0
        }
1289
1290
0
        z_strm.next_in = (Bytef*)input.get_data();
1291
0
        z_strm.avail_in = cast_set<decltype(z_strm.avail_in)>(input.get_size());
1292
0
        z_strm.next_out = (Bytef*)output->data();
1293
0
        z_strm.avail_out = cast_set<decltype(z_strm.avail_out)>(output->size());
1294
1295
0
        zres = deflate(&z_strm, Z_FINISH);
1296
0
        if (zres != Z_OK && zres != Z_STREAM_END) {
1297
0
            return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
1298
0
                                         zError(zres), zres);
1299
0
        }
1300
1301
0
        output->resize(z_strm.total_out);
1302
0
        zres = deflateEnd(&z_strm);
1303
0
        if (zres == Z_DATA_ERROR) {
1304
0
            return Status::InvalidArgument("Fail to end zlib compress");
1305
0
        } else if (zres != Z_OK) {
1306
0
            return Status::InternalError("Fail to end zlib compress");
1307
0
        }
1308
0
        return Status::OK();
1309
0
    }
1310
1311
    Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
1312
0
                    faststring* output) override {
1313
0
        size_t max_len = max_compressed_len(uncompressed_size);
1314
0
        output->resize(max_len);
1315
1316
0
        z_stream zstrm;
1317
0
        zstrm.zalloc = Z_NULL;
1318
0
        zstrm.zfree = Z_NULL;
1319
0
        zstrm.opaque = Z_NULL;
1320
0
        auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC,
1321
0
                                 8, Z_DEFAULT_STRATEGY);
1322
0
        if (zres == Z_MEM_ERROR) {
1323
0
            throw Exception(Status::MemoryLimitExceeded(
1324
0
                    "Fail to init ZLib stream compress, error={}, res={}", zError(zres), zres));
1325
0
        } else if (zres != Z_OK) {
1326
0
            return Status::InternalError("Fail to init ZLib stream compress, error={}, res={}",
1327
0
                                         zError(zres), zres);
1328
0
        }
1329
1330
        // we assume that output is e
1331
0
        zstrm.next_out = (Bytef*)output->data();
1332
0
        zstrm.avail_out = cast_set<decltype(zstrm.avail_out)>(output->size());
1333
0
        for (int i = 0; i < inputs.size(); ++i) {
1334
0
            if (inputs[i].size == 0) {
1335
0
                continue;
1336
0
            }
1337
0
            zstrm.next_in = (Bytef*)inputs[i].data;
1338
0
            zstrm.avail_in = cast_set<decltype(zstrm.avail_in)>(inputs[i].size);
1339
0
            int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH;
1340
1341
0
            zres = deflate(&zstrm, flush);
1342
0
            if (zres != Z_OK && zres != Z_STREAM_END) {
1343
0
                return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
1344
0
                                             zError(zres), zres);
1345
0
            }
1346
0
        }
1347
1348
0
        output->resize(zstrm.total_out);
1349
0
        zres = deflateEnd(&zstrm);
1350
0
        if (zres == Z_DATA_ERROR) {
1351
0
            return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}",
1352
0
                                           zError(zres), zres);
1353
0
        } else if (zres != Z_OK) {
1354
0
            return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}",
1355
0
                                         zError(zres), zres);
1356
0
        }
1357
0
        return Status::OK();
1358
0
    }
1359
1360
0
    Status decompress(const Slice& input, Slice* output) override {
1361
0
        z_stream z_strm = {};
1362
0
        z_strm.zalloc = Z_NULL;
1363
0
        z_strm.zfree = Z_NULL;
1364
0
        z_strm.opaque = Z_NULL;
1365
1366
0
        int ret = inflateInit2(&z_strm, MAX_WBITS + GZIP_CODEC);
1367
0
        if (ret != Z_OK) {
1368
0
            return Status::InternalError("Fail to init ZLib decompress, error={}, res={}",
1369
0
                                         zError(ret), ret);
1370
0
        }
1371
1372
        // 1. set input and output
1373
0
        z_strm.next_in = reinterpret_cast<Bytef*>(input.data);
1374
0
        z_strm.avail_in = cast_set<decltype(z_strm.avail_in)>(input.size);
1375
0
        z_strm.next_out = reinterpret_cast<Bytef*>(output->data);
1376
0
        z_strm.avail_out = cast_set<decltype(z_strm.avail_out)>(output->size);
1377
1378
0
        if (z_strm.avail_out > 0) {
1379
            // We only support non-streaming use case  for block decompressor
1380
0
            ret = inflate(&z_strm, Z_FINISH);
1381
0
            if (ret != Z_OK && ret != Z_STREAM_END) {
1382
0
                (void)inflateEnd(&z_strm);
1383
0
                if (ret == Z_MEM_ERROR) {
1384
0
                    throw Exception(Status::MemoryLimitExceeded(
1385
0
                            "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret));
1386
0
                } else if (ret == Z_DATA_ERROR) {
1387
0
                    return Status::InvalidArgument(
1388
0
                            "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret);
1389
0
                }
1390
0
                return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}",
1391
0
                                             zError(ret), ret);
1392
0
            }
1393
0
        }
1394
0
        (void)inflateEnd(&z_strm);
1395
1396
0
        return Status::OK();
1397
0
    }
1398
1399
0
    size_t max_compressed_len(size_t len) override {
1400
0
        z_stream zstrm;
1401
0
        zstrm.zalloc = Z_NULL;
1402
0
        zstrm.zfree = Z_NULL;
1403
0
        zstrm.opaque = Z_NULL;
1404
0
        auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC,
1405
0
                                 MEM_LEVEL, Z_DEFAULT_STRATEGY);
1406
0
        if (zres != Z_OK) {
1407
            // Fall back to zlib estimate logic for deflate, notice this may
1408
            // cause decompress error
1409
0
            LOG(WARNING) << "Fail to do ZLib stream compress, error=" << zError(zres)
1410
0
                         << ", res=" << zres;
1411
0
            return ZlibBlockCompression::max_compressed_len(len);
1412
0
        } else {
1413
0
            zres = deflateEnd(&zstrm);
1414
0
            if (zres != Z_OK) {
1415
0
                LOG(WARNING) << "Fail to do deflateEnd on ZLib stream, error=" << zError(zres)
1416
0
                             << ", res=" << zres;
1417
0
            }
1418
            // Mark, maintainer of zlib, has stated that 12 needs to be added to
1419
            // result for gzip
1420
            // http://compgroups.net/comp.unix.programmer/gzip-compressing-an-in-memory-string-usi/54854
1421
            // To have a safe upper bound for "wrapper variations", we add 32 to
1422
            // estimate
1423
0
            auto upper_bound = deflateBound(&zstrm, len) + 32;
1424
0
            return upper_bound;
1425
0
        }
1426
0
    }
1427
1428
private:
1429
    // Magic number for zlib, see https://zlib.net/manual.html for more details.
1430
    const static int GZIP_CODEC = 16; // gzip
1431
    // The memLevel parameter specifies how much memory should be allocated for
1432
    // the internal compression state.
1433
    const static int MEM_LEVEL = 8;
1434
};
1435
1436
// Only used on x86 or x86_64
1437
#if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \
1438
        defined(__i386) || defined(_M_IX86)
1439
class GzipBlockCompressionByLibdeflate final : public GzipBlockCompression {
1440
public:
1441
0
    GzipBlockCompressionByLibdeflate() : GzipBlockCompression() {}
1442
0
    static GzipBlockCompressionByLibdeflate* instance() {
1443
0
        static GzipBlockCompressionByLibdeflate s_instance;
1444
0
        return &s_instance;
1445
0
    }
1446
0
    ~GzipBlockCompressionByLibdeflate() override = default;
1447
1448
0
    Status decompress(const Slice& input, Slice* output) override {
1449
0
        if (input.empty()) {
1450
0
            output->size = 0;
1451
0
            return Status::OK();
1452
0
        }
1453
0
        thread_local std::unique_ptr<libdeflate_decompressor, void (*)(libdeflate_decompressor*)>
1454
0
                decompressor {libdeflate_alloc_decompressor(), libdeflate_free_decompressor};
1455
0
        if (!decompressor) {
1456
0
            return Status::InternalError("libdeflate_alloc_decompressor error.");
1457
0
        }
1458
0
        std::size_t out_len;
1459
0
        auto result = libdeflate_gzip_decompress(decompressor.get(), input.data, input.size,
1460
0
                                                 output->data, output->size, &out_len);
1461
0
        if (result != LIBDEFLATE_SUCCESS) {
1462
0
            return Status::InternalError("libdeflate_gzip_decompress error, res={}", result);
1463
0
        }
1464
0
        return Status::OK();
1465
0
    }
1466
};
1467
#endif
1468
1469
class LzoBlockCompression final : public BlockCompressionCodec {
1470
public:
1471
0
    static LzoBlockCompression* instance() {
1472
0
        static LzoBlockCompression s_instance;
1473
0
        return &s_instance;
1474
0
    }
1475
1476
0
    Status compress(const Slice& input, faststring* output) override {
1477
0
        return Status::InvalidArgument("not impl lzo compress.");
1478
0
    }
1479
0
    size_t max_compressed_len(size_t len) override { return 0; };
1480
0
    Status decompress(const Slice& input, Slice* output) override {
1481
0
        auto* input_ptr = input.data;
1482
0
        auto remain_input_size = input.size;
1483
0
        auto* output_ptr = output->data;
1484
0
        auto remain_output_size = output->size;
1485
0
        auto* output_limit = output->data + output->size;
1486
1487
        // Example:
1488
        // OriginData(The original data will be divided into several large data block.) :
1489
        //      large data block1 | large data block2 | large data block3 | ....
1490
        // The large data block will be divided into several small data block.
1491
        // Suppose a large data block is divided into three small blocks:
1492
        // large data block1:            | small block1 | small block2 | small block3 |
1493
        // CompressData:   <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]>
1494
        //
1495
        // A : original length of the current block of large data block.
1496
        // sizeof(A) = 4 bytes.
1497
        // A = length(small block1) + length(small block2) + length(small block3)
1498
        // Bx : length of  small data block bx.
1499
        // sizeof(Bx) = 4 bytes.
1500
        // Bx = length(compress(small blockx))
1501
0
        try {
1502
0
            while (remain_input_size > 0) {
1503
0
                if (remain_input_size < 4) {
1504
0
                    return Status::InvalidArgument(
1505
0
                            "Need more input buffer to get large_block_uncompressed_len.");
1506
0
                }
1507
1508
0
                uint32_t large_block_uncompressed_len = BigEndian::Load32(input_ptr);
1509
0
                input_ptr += 4;
1510
0
                remain_input_size -= 4;
1511
1512
0
                if (remain_output_size < large_block_uncompressed_len) {
1513
0
                    return Status::InvalidArgument(
1514
0
                            "Need more output buffer to get uncompressed data.");
1515
0
                }
1516
1517
0
                while (large_block_uncompressed_len > 0) {
1518
0
                    if (remain_input_size < 4) {
1519
0
                        return Status::InvalidArgument(
1520
0
                                "Need more input buffer to get small_block_compressed_len.");
1521
0
                    }
1522
1523
0
                    uint32_t small_block_compressed_len = BigEndian::Load32(input_ptr);
1524
0
                    input_ptr += 4;
1525
0
                    remain_input_size -= 4;
1526
1527
0
                    if (remain_input_size < small_block_compressed_len) {
1528
0
                        return Status::InvalidArgument(
1529
0
                                "Need more input buffer to decompress small block.");
1530
0
                    }
1531
1532
0
                    auto small_block_uncompressed_len =
1533
0
                            orc::lzoDecompress(input_ptr, input_ptr + small_block_compressed_len,
1534
0
                                               output_ptr, output_limit);
1535
1536
0
                    input_ptr += small_block_compressed_len;
1537
0
                    remain_input_size -= small_block_compressed_len;
1538
1539
0
                    output_ptr += small_block_uncompressed_len;
1540
0
                    large_block_uncompressed_len -= small_block_uncompressed_len;
1541
0
                    remain_output_size -= small_block_uncompressed_len;
1542
0
                }
1543
0
            }
1544
0
        } catch (const orc::ParseError& e) {
1545
            //Prevent be from hanging due to orc::lzoDecompress throw exception
1546
0
            return Status::InternalError("Fail to do LZO decompress, error={}", e.what());
1547
0
        }
1548
0
        return Status::OK();
1549
0
    }
1550
};
1551
1552
class BrotliBlockCompression final : public BlockCompressionCodec {
1553
public:
1554
0
    static BrotliBlockCompression* instance() {
1555
0
        static BrotliBlockCompression s_instance;
1556
0
        return &s_instance;
1557
0
    }
1558
1559
0
    Status compress(const Slice& input, faststring* output) override {
1560
0
        return Status::InvalidArgument("not impl brotli compress.");
1561
0
    }
1562
1563
0
    size_t max_compressed_len(size_t len) override { return 0; };
1564
1565
0
    Status decompress(const Slice& input, Slice* output) override {
1566
        // The size of output buffer is always equal to the umcompressed length.
1567
0
        BrotliDecoderResult result = BrotliDecoderDecompress(
1568
0
                input.get_size(), reinterpret_cast<const uint8_t*>(input.get_data()), &output->size,
1569
0
                reinterpret_cast<uint8_t*>(output->data));
1570
0
        if (result != BROTLI_DECODER_RESULT_SUCCESS) {
1571
0
            return Status::InternalError("Brotli decompression failed, result={}", result);
1572
0
        }
1573
0
        return Status::OK();
1574
0
    }
1575
};
1576
1577
Status get_block_compression_codec(segment_v2::CompressionTypePB type,
1578
29.4k
                                   BlockCompressionCodec** codec) {
1579
29.4k
    switch (type) {
1580
133
    case segment_v2::CompressionTypePB::NO_COMPRESSION:
1581
133
        *codec = nullptr;
1582
133
        break;
1583
47
    case segment_v2::CompressionTypePB::SNAPPY:
1584
47
        *codec = SnappyBlockCompression::instance();
1585
47
        break;
1586
368
    case segment_v2::CompressionTypePB::LZ4:
1587
368
        *codec = Lz4BlockCompression::instance();
1588
368
        break;
1589
28.4k
    case segment_v2::CompressionTypePB::LZ4F:
1590
28.4k
        *codec = Lz4fBlockCompression::instance();
1591
28.4k
        break;
1592
2
    case segment_v2::CompressionTypePB::LZ4HC:
1593
2
        *codec = Lz4HCBlockCompression::instance();
1594
2
        break;
1595
2
    case segment_v2::CompressionTypePB::ZLIB:
1596
2
        *codec = ZlibBlockCompression::instance();
1597
2
        break;
1598
504
    case segment_v2::CompressionTypePB::ZSTD:
1599
504
        *codec = ZstdBlockCompression::instance();
1600
504
        break;
1601
0
    default:
1602
0
        return Status::InternalError("unknown compression type({})", type);
1603
29.4k
    }
1604
1605
29.4k
    return Status::OK();
1606
29.4k
}
1607
1608
// this can only be used in hive text write
1609
0
Status get_block_compression_codec(TFileCompressType::type type, BlockCompressionCodec** codec) {
1610
0
    switch (type) {
1611
0
    case TFileCompressType::PLAIN:
1612
0
        *codec = nullptr;
1613
0
        break;
1614
0
    case TFileCompressType::ZLIB:
1615
0
        *codec = ZlibBlockCompression::instance();
1616
0
        break;
1617
0
    case TFileCompressType::GZ:
1618
0
        *codec = GzipBlockCompression::instance();
1619
0
        break;
1620
0
    case TFileCompressType::BZ2:
1621
0
        *codec = Bzip2BlockCompression::instance();
1622
0
        break;
1623
0
    case TFileCompressType::LZ4BLOCK:
1624
0
        *codec = HadoopLz4BlockCompression::instance();
1625
0
        break;
1626
0
    case TFileCompressType::SNAPPYBLOCK:
1627
0
        *codec = HadoopSnappyBlockCompression::instance();
1628
0
        break;
1629
0
    case TFileCompressType::ZSTD:
1630
0
        *codec = ZstdBlockCompression::instance();
1631
0
        break;
1632
0
    default:
1633
0
        return Status::InternalError("unsupport compression type({}) int hive text", type);
1634
0
    }
1635
1636
0
    return Status::OK();
1637
0
}
1638
1639
Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_codec,
1640
108
                                   BlockCompressionCodec** codec) {
1641
108
    switch (parquet_codec) {
1642
0
    case tparquet::CompressionCodec::UNCOMPRESSED:
1643
0
        *codec = nullptr;
1644
0
        break;
1645
108
    case tparquet::CompressionCodec::SNAPPY:
1646
108
        *codec = SnappyBlockCompression::instance();
1647
108
        break;
1648
0
    case tparquet::CompressionCodec::LZ4_RAW: // we can use LZ4 compression algorithm parse LZ4_RAW
1649
0
    case tparquet::CompressionCodec::LZ4:
1650
0
        *codec = HadoopLz4BlockCompression::instance();
1651
0
        break;
1652
0
    case tparquet::CompressionCodec::ZSTD:
1653
0
        *codec = ZstdBlockCompression::instance();
1654
0
        break;
1655
0
    case tparquet::CompressionCodec::GZIP:
1656
// Only used on x86 or x86_64
1657
0
#if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \
1658
0
        defined(__i386) || defined(_M_IX86)
1659
0
        *codec = GzipBlockCompressionByLibdeflate::instance();
1660
#else
1661
        *codec = GzipBlockCompression::instance();
1662
#endif
1663
0
        break;
1664
0
    case tparquet::CompressionCodec::LZO:
1665
0
        *codec = LzoBlockCompression::instance();
1666
0
        break;
1667
0
    case tparquet::CompressionCodec::BROTLI:
1668
0
        *codec = BrotliBlockCompression::instance();
1669
0
        break;
1670
0
    default:
1671
0
        return Status::InternalError("unknown compression type({})", parquet_codec);
1672
108
    }
1673
1674
108
    return Status::OK();
1675
108
}
1676
1677
#include "common/compile_check_end.h"
1678
} // namespace doris