/root/doris/be/src/util/block_compression.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "util/block_compression.h" |
19 | | |
20 | | #include <bzlib.h> |
21 | | #include <gen_cpp/parquet_types.h> |
22 | | #include <gen_cpp/segment_v2.pb.h> |
23 | | #include <glog/logging.h> |
24 | | |
25 | | #include <exception> |
26 | | // Only used on x86 or x86_64 |
27 | | #if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \ |
28 | | defined(__i386) || defined(_M_IX86) |
29 | | #include <libdeflate.h> |
30 | | #endif |
31 | | #include <brotli/decode.h> |
32 | | #include <glog/log_severity.h> |
33 | | #include <glog/logging.h> |
34 | | #include <lz4/lz4.h> |
35 | | #include <lz4/lz4frame.h> |
36 | | #include <lz4/lz4hc.h> |
37 | | #include <snappy/snappy-sinksource.h> |
38 | | #include <snappy/snappy.h> |
39 | | #include <zconf.h> |
40 | | #include <zlib.h> |
41 | | #include <zstd.h> |
42 | | #include <zstd_errors.h> |
43 | | |
44 | | #include <algorithm> |
45 | | #include <cstdint> |
46 | | #include <limits> |
47 | | #include <mutex> |
48 | | #include <orc/Exceptions.hh> |
49 | | #include <ostream> |
50 | | |
51 | | #include "common/config.h" |
52 | | #include "common/factory_creator.h" |
53 | | #include "exec/decompressor.h" |
54 | | #include "gutil/endian.h" |
55 | | #include "gutil/strings/substitute.h" |
56 | | #include "runtime/thread_context.h" |
57 | | #include "util/defer_op.h" |
58 | | #include "util/faststring.h" |
59 | | |
60 | | namespace orc { |
61 | | /** |
62 | | * Decompress the bytes in to the output buffer. |
63 | | * @param inputAddress the start of the input |
64 | | * @param inputLimit one past the last byte of the input |
65 | | * @param outputAddress the start of the output buffer |
66 | | * @param outputLimit one past the last byte of the output buffer |
67 | | * @result the number of bytes decompressed |
68 | | */ |
69 | | uint64_t lzoDecompress(const char* inputAddress, const char* inputLimit, char* outputAddress, |
70 | | char* outputLimit); |
71 | | } // namespace orc |
72 | | |
73 | | namespace doris { |
74 | | |
75 | | // exception safe |
76 | | Status BlockCompressionCodec::compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
77 | 2 | faststring* output) { |
78 | 2 | faststring buf; |
79 | | // we compute total size to avoid more memory copy |
80 | 2 | buf.reserve(uncompressed_size); |
81 | 10 | for (auto& input : inputs) { |
82 | 10 | buf.append(input.data, input.size); |
83 | 10 | } |
84 | 2 | return compress(buf, output); |
85 | 2 | } |
86 | | |
87 | 11.1k | bool BlockCompressionCodec::exceed_max_compress_len(size_t uncompressed_size) { |
88 | 11.1k | return uncompressed_size > std::numeric_limits<int32_t>::max(); |
89 | 11.1k | } |
90 | | |
91 | | class Lz4BlockCompression : public BlockCompressionCodec { |
92 | | private: |
93 | | class Context { |
94 | | ENABLE_FACTORY_CREATOR(Context); |
95 | | |
96 | | public: |
97 | 1 | Context() : ctx(nullptr) { |
98 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
99 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
100 | 1 | buffer = std::make_unique<faststring>(); |
101 | 1 | } |
102 | | LZ4_stream_t* ctx; |
103 | | std::unique_ptr<faststring> buffer; |
104 | 1 | ~Context() { |
105 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
106 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
107 | 1 | if (ctx) { |
108 | 1 | LZ4_freeStream(ctx); |
109 | 1 | } |
110 | 1 | buffer.reset(); |
111 | 1 | } |
112 | | }; |
113 | | |
114 | | public: |
115 | 59 | static Lz4BlockCompression* instance() { |
116 | 59 | static Lz4BlockCompression s_instance; |
117 | 59 | return &s_instance; |
118 | 59 | } |
119 | 1 | ~Lz4BlockCompression() override { |
120 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
121 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
122 | 1 | _ctx_pool.clear(); |
123 | 1 | } |
124 | | |
125 | 44 | Status compress(const Slice& input, faststring* output) override { |
126 | 44 | if (input.size > LZ4_MAX_INPUT_SIZE) { |
127 | 0 | return Status::InvalidArgument( |
128 | 0 | "LZ4 not support those case(input.size>LZ4_MAX_INPUT_SIZE), maybe you should " |
129 | 0 | "change " |
130 | 0 | "fragment_transmission_compression_codec to snappy, input.size={}, " |
131 | 0 | "LZ4_MAX_INPUT_SIZE={}", |
132 | 0 | input.size, LZ4_MAX_INPUT_SIZE); |
133 | 0 | } |
134 | | |
135 | 44 | std::unique_ptr<Context> context; |
136 | 44 | RETURN_IF_ERROR(_acquire_compression_ctx(context)); |
137 | 44 | bool compress_failed = false; |
138 | 44 | Defer defer {[&] { |
139 | 44 | if (!compress_failed) { |
140 | 44 | _release_compression_ctx(std::move(context)); |
141 | 44 | } |
142 | 44 | }}; |
143 | | |
144 | 44 | try { |
145 | 44 | Slice compressed_buf; |
146 | 44 | size_t max_len = max_compressed_len(input.size); |
147 | 44 | if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
148 | | // use output directly |
149 | 0 | output->resize(max_len); |
150 | 0 | compressed_buf.data = reinterpret_cast<char*>(output->data()); |
151 | 0 | compressed_buf.size = max_len; |
152 | 44 | } else { |
153 | | // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE |
154 | 44 | { |
155 | | // context->buffer is resuable between queries, should accouting to |
156 | | // global tracker. |
157 | 44 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
158 | 44 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
159 | 44 | context->buffer->resize(max_len); |
160 | 44 | } |
161 | 44 | compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); |
162 | 44 | compressed_buf.size = max_len; |
163 | 44 | } |
164 | | |
165 | 44 | size_t compressed_len = |
166 | 44 | LZ4_compress_fast_continue(context->ctx, input.data, compressed_buf.data, |
167 | 44 | input.size, compressed_buf.size, ACCELARATION); |
168 | 44 | if (compressed_len == 0) { |
169 | 0 | compress_failed = true; |
170 | 0 | return Status::InvalidArgument("Output buffer's capacity is not enough, size={}", |
171 | 0 | compressed_buf.size); |
172 | 0 | } |
173 | 44 | output->resize(compressed_len); |
174 | 44 | if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
175 | 44 | output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), |
176 | 44 | compressed_len); |
177 | 44 | } |
178 | 44 | } catch (...) { |
179 | | // Do not set compress_failed to release context |
180 | 0 | DCHECK(!compress_failed); |
181 | 0 | return Status::InternalError("Fail to do LZ4Block compress due to exception"); |
182 | 0 | } |
183 | 44 | return Status::OK(); |
184 | 44 | } |
185 | | |
186 | 30 | Status decompress(const Slice& input, Slice* output) override { |
187 | 30 | auto decompressed_len = |
188 | 30 | LZ4_decompress_safe(input.data, output->data, input.size, output->size); |
189 | 30 | if (decompressed_len < 0) { |
190 | 5 | return Status::InvalidArgument("fail to do LZ4 decompress, error={}", decompressed_len); |
191 | 5 | } |
192 | 25 | output->size = decompressed_len; |
193 | 25 | return Status::OK(); |
194 | 30 | } |
195 | | |
196 | 44 | size_t max_compressed_len(size_t len) override { return LZ4_compressBound(len); } |
197 | | |
198 | | private: |
199 | | // reuse LZ4 compress stream |
200 | 44 | Status _acquire_compression_ctx(std::unique_ptr<Context>& out) { |
201 | 44 | std::lock_guard<std::mutex> l(_ctx_mutex); |
202 | 44 | if (_ctx_pool.empty()) { |
203 | 1 | std::unique_ptr<Context> localCtx = Context::create_unique(); |
204 | 1 | if (localCtx.get() == nullptr) { |
205 | 0 | return Status::InvalidArgument("new LZ4 context error"); |
206 | 0 | } |
207 | 1 | localCtx->ctx = LZ4_createStream(); |
208 | 1 | if (localCtx->ctx == nullptr) { |
209 | 0 | return Status::InvalidArgument("LZ4_createStream error"); |
210 | 0 | } |
211 | 1 | out = std::move(localCtx); |
212 | 1 | return Status::OK(); |
213 | 1 | } |
214 | 43 | out = std::move(_ctx_pool.back()); |
215 | 43 | _ctx_pool.pop_back(); |
216 | 43 | return Status::OK(); |
217 | 44 | } |
218 | 44 | void _release_compression_ctx(std::unique_ptr<Context> context) { |
219 | 44 | DCHECK(context); |
220 | 44 | LZ4_resetStream(context->ctx); |
221 | 44 | std::lock_guard<std::mutex> l(_ctx_mutex); |
222 | 44 | _ctx_pool.push_back(std::move(context)); |
223 | 44 | } |
224 | | |
225 | | private: |
226 | | mutable std::mutex _ctx_mutex; |
227 | | mutable std::vector<std::unique_ptr<Context>> _ctx_pool; |
228 | | static const int32_t ACCELARATION = 1; |
229 | | }; |
230 | | |
231 | | class HadoopLz4BlockCompression : public Lz4BlockCompression { |
232 | | public: |
233 | 0 | HadoopLz4BlockCompression() { |
234 | 0 | Status st = Decompressor::create_decompressor(CompressType::LZ4BLOCK, &_decompressor); |
235 | 0 | if (!st.ok()) { |
236 | 0 | LOG(FATAL) << "HadoopLz4BlockCompression construction failed. status = " << st << "\n"; |
237 | 0 | } |
238 | 0 | } |
239 | | |
240 | 0 | ~HadoopLz4BlockCompression() override = default; |
241 | | |
242 | 0 | static HadoopLz4BlockCompression* instance() { |
243 | 0 | static HadoopLz4BlockCompression s_instance; |
244 | 0 | return &s_instance; |
245 | 0 | } |
246 | | |
247 | | // hadoop use block compression for lz4 |
248 | | // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc |
249 | 0 | Status compress(const Slice& input, faststring* output) override { |
250 | | // be same with hadop https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java |
251 | 0 | size_t lz4_block_size = config::lz4_compression_block_size; |
252 | 0 | size_t overhead = lz4_block_size / 255 + 16; |
253 | 0 | size_t max_input_size = lz4_block_size - overhead; |
254 | |
|
255 | 0 | size_t data_len = input.size; |
256 | 0 | char* data = input.data; |
257 | 0 | std::vector<OwnedSlice> buffers; |
258 | 0 | size_t out_len = 0; |
259 | |
|
260 | 0 | while (data_len > 0) { |
261 | 0 | size_t input_size = std::min(data_len, max_input_size); |
262 | 0 | Slice input_slice(data, input_size); |
263 | 0 | faststring output_data; |
264 | 0 | RETURN_IF_ERROR(Lz4BlockCompression::compress(input_slice, &output_data)); |
265 | 0 | out_len += output_data.size(); |
266 | 0 | buffers.push_back(output_data.build()); |
267 | 0 | data += input_size; |
268 | 0 | data_len -= input_size; |
269 | 0 | } |
270 | | |
271 | | // hadoop block compression: umcompressed_length | compressed_length1 | compressed_data1 | compressed_length2 | compressed_data2 | ... |
272 | 0 | size_t total_output_len = 4 + 4 * buffers.size() + out_len; |
273 | 0 | output->resize(total_output_len); |
274 | 0 | char* output_buffer = (char*)output->data(); |
275 | 0 | BigEndian::Store32(output_buffer, input.get_size()); |
276 | 0 | output_buffer += 4; |
277 | 0 | for (const auto& buffer : buffers) { |
278 | 0 | auto slice = buffer.slice(); |
279 | 0 | BigEndian::Store32(output_buffer, slice.get_size()); |
280 | 0 | output_buffer += 4; |
281 | 0 | memcpy(output_buffer, slice.get_data(), slice.get_size()); |
282 | 0 | output_buffer += slice.get_size(); |
283 | 0 | } |
284 | |
|
285 | 0 | DCHECK_EQ(output_buffer - (char*)output->data(), total_output_len); |
286 | |
|
287 | 0 | return Status::OK(); |
288 | 0 | } |
289 | | |
290 | 0 | Status decompress(const Slice& input, Slice* output) override { |
291 | 0 | size_t input_bytes_read = 0; |
292 | 0 | size_t decompressed_len = 0; |
293 | 0 | size_t more_input_bytes = 0; |
294 | 0 | size_t more_output_bytes = 0; |
295 | 0 | bool stream_end = false; |
296 | 0 | auto st = _decompressor->decompress((uint8_t*)input.data, input.size, &input_bytes_read, |
297 | 0 | (uint8_t*)output->data, output->size, &decompressed_len, |
298 | 0 | &stream_end, &more_input_bytes, &more_output_bytes); |
299 | | //try decompress use hadoopLz4 ,if failed fall back lz4. |
300 | 0 | return (st != Status::OK() || stream_end != true) |
301 | 0 | ? Lz4BlockCompression::decompress(input, output) |
302 | 0 | : Status::OK(); |
303 | 0 | } |
304 | | |
305 | | private: |
306 | | std::unique_ptr<Decompressor> _decompressor; |
307 | | }; |
308 | | // Used for LZ4 frame format, decompress speed is two times faster than LZ4. |
309 | | class Lz4fBlockCompression : public BlockCompressionCodec { |
310 | | private: |
311 | | class CContext { |
312 | | ENABLE_FACTORY_CREATOR(CContext); |
313 | | |
314 | | public: |
315 | 1 | CContext() : ctx(nullptr) { |
316 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
317 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
318 | 1 | buffer = std::make_unique<faststring>(); |
319 | 1 | } |
320 | | LZ4F_compressionContext_t ctx; |
321 | | std::unique_ptr<faststring> buffer; |
322 | 1 | ~CContext() { |
323 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
324 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
325 | 1 | if (ctx) { |
326 | 1 | LZ4F_freeCompressionContext(ctx); |
327 | 1 | } |
328 | 1 | buffer.reset(); |
329 | 1 | } |
330 | | }; |
331 | | class DContext { |
332 | | ENABLE_FACTORY_CREATOR(DContext); |
333 | | |
334 | | public: |
335 | 6 | DContext() : ctx(nullptr) {} |
336 | | LZ4F_decompressionContext_t ctx; |
337 | 6 | ~DContext() { |
338 | 6 | if (ctx) { |
339 | 6 | LZ4F_freeDecompressionContext(ctx); |
340 | 6 | } |
341 | 6 | } |
342 | | }; |
343 | | |
344 | | public: |
345 | 20.3k | static Lz4fBlockCompression* instance() { |
346 | 20.3k | static Lz4fBlockCompression s_instance; |
347 | 20.3k | return &s_instance; |
348 | 20.3k | } |
349 | 1 | ~Lz4fBlockCompression() { |
350 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
351 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
352 | 1 | _ctx_c_pool.clear(); |
353 | 1 | _ctx_d_pool.clear(); |
354 | 1 | } |
355 | | |
356 | 5 | Status compress(const Slice& input, faststring* output) override { |
357 | 5 | std::vector<Slice> inputs {input}; |
358 | 5 | return compress(inputs, input.size, output); |
359 | 5 | } |
360 | | |
361 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
362 | 11.0k | faststring* output) override { |
363 | 11.0k | return _compress(inputs, uncompressed_size, output); |
364 | 11.0k | } |
365 | | |
366 | 4.99k | Status decompress(const Slice& input, Slice* output) override { |
367 | 4.99k | return _decompress(input, output); |
368 | 4.99k | } |
369 | | |
370 | 11.0k | size_t max_compressed_len(size_t len) override { |
371 | 11.0k | return std::max(LZ4F_compressBound(len, &_s_preferences), |
372 | 11.0k | LZ4F_compressFrameBound(len, &_s_preferences)); |
373 | 11.0k | } |
374 | | |
375 | | private: |
376 | | Status _compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
377 | 11.0k | faststring* output) { |
378 | 11.0k | std::unique_ptr<CContext> context; |
379 | 11.0k | RETURN_IF_ERROR(_acquire_compression_ctx(context)); |
380 | 11.0k | bool compress_failed = false; |
381 | 11.0k | Defer defer {[&] { |
382 | 11.0k | if (!compress_failed) { |
383 | 11.0k | _release_compression_ctx(std::move(context)); |
384 | 11.0k | } |
385 | 11.0k | }}; |
386 | | |
387 | 11.0k | try { |
388 | 11.0k | Slice compressed_buf; |
389 | 11.0k | size_t max_len = max_compressed_len(uncompressed_size); |
390 | 11.0k | if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
391 | | // use output directly |
392 | 0 | output->resize(max_len); |
393 | 0 | compressed_buf.data = reinterpret_cast<char*>(output->data()); |
394 | 0 | compressed_buf.size = max_len; |
395 | 11.0k | } else { |
396 | 11.0k | { |
397 | 11.0k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
398 | 11.0k | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
399 | | // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE |
400 | 11.0k | context->buffer->resize(max_len); |
401 | 11.0k | } |
402 | 11.0k | compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); |
403 | 11.0k | compressed_buf.size = max_len; |
404 | 11.0k | } |
405 | | |
406 | 11.0k | auto wbytes = LZ4F_compressBegin(context->ctx, compressed_buf.data, compressed_buf.size, |
407 | 11.0k | &_s_preferences); |
408 | 11.0k | if (LZ4F_isError(wbytes)) { |
409 | 0 | compress_failed = true; |
410 | 0 | return Status::InvalidArgument("Fail to do LZ4F compress begin, res={}", |
411 | 0 | LZ4F_getErrorName(wbytes)); |
412 | 0 | } |
413 | 11.0k | size_t offset = wbytes; |
414 | 11.0k | for (auto input : inputs) { |
415 | 11.0k | wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data + offset, |
416 | 11.0k | compressed_buf.size - offset, input.data, input.size, |
417 | 11.0k | nullptr); |
418 | 11.0k | if (LZ4F_isError(wbytes)) { |
419 | 0 | compress_failed = true; |
420 | 0 | return Status::InvalidArgument("Fail to do LZ4F compress update, res={}", |
421 | 0 | LZ4F_getErrorName(wbytes)); |
422 | 0 | } |
423 | 11.0k | offset += wbytes; |
424 | 11.0k | } |
425 | 11.0k | wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + offset, |
426 | 11.0k | compressed_buf.size - offset, nullptr); |
427 | 11.0k | if (LZ4F_isError(wbytes)) { |
428 | 0 | compress_failed = true; |
429 | 0 | return Status::InvalidArgument("Fail to do LZ4F compress end, res={}", |
430 | 0 | LZ4F_getErrorName(wbytes)); |
431 | 0 | } |
432 | 11.0k | offset += wbytes; |
433 | 11.0k | output->resize(offset); |
434 | 11.0k | if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
435 | 11.0k | output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), offset); |
436 | 11.0k | } |
437 | 11.0k | } catch (...) { |
438 | | // Do not set compress_failed to release context |
439 | 0 | DCHECK(!compress_failed); |
440 | 0 | return Status::InternalError("Fail to do LZ4F compress due to exception"); |
441 | 0 | } |
442 | | |
443 | 11.0k | return Status::OK(); |
444 | 11.0k | } |
445 | | |
446 | 4.99k | Status _decompress(const Slice& input, Slice* output) { |
447 | 4.99k | bool decompress_failed = false; |
448 | 4.99k | std::unique_ptr<DContext> context; |
449 | 4.99k | RETURN_IF_ERROR(_acquire_decompression_ctx(context)); |
450 | 4.99k | Defer defer {[&] { |
451 | 4.99k | if (!decompress_failed) { |
452 | 4.98k | _release_decompression_ctx(std::move(context)); |
453 | 4.98k | } |
454 | 4.99k | }}; |
455 | 4.99k | size_t input_size = input.size; |
456 | 4.99k | auto lres = LZ4F_decompress(context->ctx, output->data, &output->size, input.data, |
457 | 4.99k | &input_size, nullptr); |
458 | 4.99k | if (LZ4F_isError(lres)) { |
459 | 0 | decompress_failed = true; |
460 | 0 | return Status::InvalidArgument("Fail to do LZ4F decompress, res={}", |
461 | 0 | LZ4F_getErrorName(lres)); |
462 | 4.99k | } else if (input_size != input.size) { |
463 | 0 | decompress_failed = true; |
464 | 0 | return Status::InvalidArgument( |
465 | 0 | strings::Substitute("Fail to do LZ4F decompress: trailing data left in " |
466 | 0 | "compressed data, read=$0 vs given=$1", |
467 | 0 | input_size, input.size)); |
468 | 4.99k | } else if (lres != 0) { |
469 | 5 | decompress_failed = true; |
470 | 5 | return Status::InvalidArgument( |
471 | 5 | "Fail to do LZ4F decompress: expect more compressed data, expect={}", lres); |
472 | 5 | } |
473 | 4.98k | return Status::OK(); |
474 | 4.99k | } |
475 | | |
476 | | private: |
477 | | // acquire a compression ctx from pool, release while finish compress, |
478 | | // delete if compression failed |
479 | 11.0k | Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) { |
480 | 11.0k | std::lock_guard<std::mutex> l(_ctx_c_mutex); |
481 | 11.0k | if (_ctx_c_pool.empty()) { |
482 | 1 | std::unique_ptr<CContext> localCtx = CContext::create_unique(); |
483 | 1 | if (localCtx.get() == nullptr) { |
484 | 0 | return Status::InvalidArgument("failed to new LZ4F CContext"); |
485 | 0 | } |
486 | 1 | auto res = LZ4F_createCompressionContext(&localCtx->ctx, LZ4F_VERSION); |
487 | 1 | if (LZ4F_isError(res) != 0) { |
488 | 0 | return Status::InvalidArgument(strings::Substitute( |
489 | 0 | "LZ4F_createCompressionContext error, res=$0", LZ4F_getErrorName(res))); |
490 | 0 | } |
491 | 1 | out = std::move(localCtx); |
492 | 1 | return Status::OK(); |
493 | 1 | } |
494 | 11.0k | out = std::move(_ctx_c_pool.back()); |
495 | 11.0k | _ctx_c_pool.pop_back(); |
496 | 11.0k | return Status::OK(); |
497 | 11.0k | } |
498 | 11.0k | void _release_compression_ctx(std::unique_ptr<CContext> context) { |
499 | 11.0k | DCHECK(context); |
500 | 11.0k | std::lock_guard<std::mutex> l(_ctx_c_mutex); |
501 | 11.0k | _ctx_c_pool.push_back(std::move(context)); |
502 | 11.0k | } |
503 | | |
504 | 4.99k | Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) { |
505 | 4.99k | std::lock_guard<std::mutex> l(_ctx_d_mutex); |
506 | 4.99k | if (_ctx_d_pool.empty()) { |
507 | 6 | std::unique_ptr<DContext> localCtx = DContext::create_unique(); |
508 | 6 | if (localCtx.get() == nullptr) { |
509 | 0 | return Status::InvalidArgument("failed to new LZ4F DContext"); |
510 | 0 | } |
511 | 6 | auto res = LZ4F_createDecompressionContext(&localCtx->ctx, LZ4F_VERSION); |
512 | 6 | if (LZ4F_isError(res) != 0) { |
513 | 0 | return Status::InvalidArgument(strings::Substitute( |
514 | 0 | "LZ4F_createDeompressionContext error, res=$0", LZ4F_getErrorName(res))); |
515 | 0 | } |
516 | 6 | out = std::move(localCtx); |
517 | 6 | return Status::OK(); |
518 | 6 | } |
519 | 4.98k | out = std::move(_ctx_d_pool.back()); |
520 | 4.98k | _ctx_d_pool.pop_back(); |
521 | 4.98k | return Status::OK(); |
522 | 4.99k | } |
523 | 4.98k | void _release_decompression_ctx(std::unique_ptr<DContext> context) { |
524 | 4.98k | DCHECK(context); |
525 | | // reset decompression context to avoid ERROR_maxBlockSize_invalid |
526 | 4.98k | LZ4F_resetDecompressionContext(context->ctx); |
527 | 4.98k | std::lock_guard<std::mutex> l(_ctx_d_mutex); |
528 | 4.98k | _ctx_d_pool.push_back(std::move(context)); |
529 | 4.98k | } |
530 | | |
531 | | private: |
532 | | static LZ4F_preferences_t _s_preferences; |
533 | | |
534 | | std::mutex _ctx_c_mutex; |
535 | | // LZ4F_compressionContext_t is a pointer so no copy here |
536 | | std::vector<std::unique_ptr<CContext>> _ctx_c_pool; |
537 | | |
538 | | std::mutex _ctx_d_mutex; |
539 | | std::vector<std::unique_ptr<DContext>> _ctx_d_pool; |
540 | | }; |
541 | | |
542 | | LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = { |
543 | | {LZ4F_max256KB, LZ4F_blockLinked, LZ4F_noContentChecksum, LZ4F_frame, 0ULL, 0U, |
544 | | LZ4F_noBlockChecksum}, |
545 | | 0, |
546 | | 0u, |
547 | | 0u, |
548 | | {0u, 0u, 0u}}; |
549 | | |
550 | | class Lz4HCBlockCompression : public BlockCompressionCodec { |
551 | | private: |
552 | | class Context { |
553 | | ENABLE_FACTORY_CREATOR(Context); |
554 | | |
555 | | public: |
556 | 1 | Context() : ctx(nullptr) { |
557 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
558 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
559 | 1 | buffer = std::make_unique<faststring>(); |
560 | 1 | } |
561 | | LZ4_streamHC_t* ctx; |
562 | | std::unique_ptr<faststring> buffer; |
563 | 1 | ~Context() { |
564 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
565 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
566 | 1 | if (ctx) { |
567 | 1 | LZ4_freeStreamHC(ctx); |
568 | 1 | } |
569 | 1 | buffer.reset(); |
570 | 1 | } |
571 | | }; |
572 | | |
573 | | public: |
574 | 2 | static Lz4HCBlockCompression* instance() { |
575 | 2 | static Lz4HCBlockCompression s_instance; |
576 | 2 | return &s_instance; |
577 | 2 | } |
578 | 1 | ~Lz4HCBlockCompression() { |
579 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
580 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
581 | 1 | _ctx_pool.clear(); |
582 | 1 | } |
583 | | |
584 | 6 | Status compress(const Slice& input, faststring* output) override { |
585 | 6 | std::unique_ptr<Context> context; |
586 | 6 | RETURN_IF_ERROR(_acquire_compression_ctx(context)); |
587 | 6 | bool compress_failed = false; |
588 | 6 | Defer defer {[&] { |
589 | 6 | if (!compress_failed) { |
590 | 6 | _release_compression_ctx(std::move(context)); |
591 | 6 | } |
592 | 6 | }}; |
593 | | |
594 | 6 | try { |
595 | 6 | Slice compressed_buf; |
596 | 6 | size_t max_len = max_compressed_len(input.size); |
597 | 6 | if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
598 | | // use output directly |
599 | 0 | output->resize(max_len); |
600 | 0 | compressed_buf.data = reinterpret_cast<char*>(output->data()); |
601 | 0 | compressed_buf.size = max_len; |
602 | 6 | } else { |
603 | 6 | { |
604 | 6 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
605 | 6 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
606 | | // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE |
607 | 6 | context->buffer->resize(max_len); |
608 | 6 | } |
609 | 6 | compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); |
610 | 6 | compressed_buf.size = max_len; |
611 | 6 | } |
612 | | |
613 | 6 | size_t compressed_len = LZ4_compress_HC_continue( |
614 | 6 | context->ctx, input.data, compressed_buf.data, input.size, compressed_buf.size); |
615 | 6 | if (compressed_len == 0) { |
616 | 0 | compress_failed = true; |
617 | 0 | return Status::InvalidArgument("Output buffer's capacity is not enough, size={}", |
618 | 0 | compressed_buf.size); |
619 | 0 | } |
620 | 6 | output->resize(compressed_len); |
621 | 6 | if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
622 | 6 | output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), |
623 | 6 | compressed_len); |
624 | 6 | } |
625 | 6 | } catch (...) { |
626 | | // Do not set compress_failed to release context |
627 | 0 | DCHECK(!compress_failed); |
628 | 0 | return Status::InternalError("Fail to do LZ4HC compress due to exception"); |
629 | 0 | } |
630 | 6 | return Status::OK(); |
631 | 6 | } |
632 | | |
633 | 11 | Status decompress(const Slice& input, Slice* output) override { |
634 | 11 | auto decompressed_len = |
635 | 11 | LZ4_decompress_safe(input.data, output->data, input.size, output->size); |
636 | 11 | if (decompressed_len < 0) { |
637 | 5 | return Status::InvalidArgument("fail to do LZ4 decompress, error={}", decompressed_len); |
638 | 5 | } |
639 | 6 | output->size = decompressed_len; |
640 | 6 | return Status::OK(); |
641 | 11 | } |
642 | | |
643 | 6 | size_t max_compressed_len(size_t len) override { return LZ4_compressBound(len); } |
644 | | |
645 | | private: |
646 | 6 | Status _acquire_compression_ctx(std::unique_ptr<Context>& out) { |
647 | 6 | std::lock_guard<std::mutex> l(_ctx_mutex); |
648 | 6 | if (_ctx_pool.empty()) { |
649 | 1 | std::unique_ptr<Context> localCtx = Context::create_unique(); |
650 | 1 | if (localCtx.get() == nullptr) { |
651 | 0 | return Status::InvalidArgument("new LZ4HC context error"); |
652 | 0 | } |
653 | 1 | localCtx->ctx = LZ4_createStreamHC(); |
654 | 1 | if (localCtx->ctx == nullptr) { |
655 | 0 | return Status::InvalidArgument("LZ4_createStreamHC error"); |
656 | 0 | } |
657 | 1 | out = std::move(localCtx); |
658 | 1 | return Status::OK(); |
659 | 1 | } |
660 | 5 | out = std::move(_ctx_pool.back()); |
661 | 5 | _ctx_pool.pop_back(); |
662 | 5 | return Status::OK(); |
663 | 6 | } |
664 | 6 | void _release_compression_ctx(std::unique_ptr<Context> context) { |
665 | 6 | DCHECK(context); |
666 | 6 | LZ4_resetStreamHC_fast(context->ctx, _compression_level); |
667 | 6 | std::lock_guard<std::mutex> l(_ctx_mutex); |
668 | 6 | _ctx_pool.push_back(std::move(context)); |
669 | 6 | } |
670 | | |
671 | | private: |
672 | | int64_t _compression_level = config::LZ4_HC_compression_level; |
673 | | mutable std::mutex _ctx_mutex; |
674 | | mutable std::vector<std::unique_ptr<Context>> _ctx_pool; |
675 | | }; |
676 | | |
677 | | class SnappySlicesSource : public snappy::Source { |
678 | | public: |
679 | | SnappySlicesSource(const std::vector<Slice>& slices) |
680 | 1 | : _available(0), _cur_slice(0), _slice_off(0) { |
681 | 5 | for (auto& slice : slices) { |
682 | | // We filter empty slice here to avoid complicated process |
683 | 5 | if (slice.size == 0) { |
684 | 1 | continue; |
685 | 1 | } |
686 | 4 | _available += slice.size; |
687 | 4 | _slices.push_back(slice); |
688 | 4 | } |
689 | 1 | } |
690 | 1 | ~SnappySlicesSource() override {} |
691 | | |
692 | | // Return the number of bytes left to read from the source |
693 | 1 | size_t Available() const override { return _available; } |
694 | | |
695 | | // Peek at the next flat region of the source. Does not reposition |
696 | | // the source. The returned region is empty iff Available()==0. |
697 | | // |
698 | | // Returns a pointer to the beginning of the region and store its |
699 | | // length in *len. |
700 | | // |
701 | | // The returned region is valid until the next call to Skip() or |
702 | | // until this object is destroyed, whichever occurs first. |
703 | | // |
704 | | // The returned region may be larger than Available() (for example |
705 | | // if this ByteSource is a view on a substring of a larger source). |
706 | | // The caller is responsible for ensuring that it only reads the |
707 | | // Available() bytes. |
708 | 19 | const char* Peek(size_t* len) override { |
709 | 19 | if (_available == 0) { |
710 | 0 | *len = 0; |
711 | 0 | return nullptr; |
712 | 0 | } |
713 | | // we should assure that *len is not 0 |
714 | 19 | *len = _slices[_cur_slice].size - _slice_off; |
715 | 19 | DCHECK(*len != 0); |
716 | 19 | return _slices[_cur_slice].data + _slice_off; |
717 | 19 | } |
718 | | |
719 | | // Skip the next n bytes. Invalidates any buffer returned by |
720 | | // a previous call to Peek(). |
721 | | // REQUIRES: Available() >= n |
722 | 20 | void Skip(size_t n) override { |
723 | 20 | _available -= n; |
724 | 24 | while (n > 0) { |
725 | 19 | auto left = _slices[_cur_slice].size - _slice_off; |
726 | 19 | if (left > n) { |
727 | | // n can be digest in current slice |
728 | 15 | _slice_off += n; |
729 | 15 | return; |
730 | 15 | } |
731 | 4 | _slice_off = 0; |
732 | 4 | _cur_slice++; |
733 | 4 | n -= left; |
734 | 4 | } |
735 | 20 | } |
736 | | |
737 | | private: |
738 | | std::vector<Slice> _slices; |
739 | | size_t _available; |
740 | | size_t _cur_slice; |
741 | | size_t _slice_off; |
742 | | }; |
743 | | |
744 | | class SnappyBlockCompression : public BlockCompressionCodec { |
745 | | public: |
746 | 86 | static SnappyBlockCompression* instance() { |
747 | 86 | static SnappyBlockCompression s_instance; |
748 | 86 | return &s_instance; |
749 | 86 | } |
750 | 1 | ~SnappyBlockCompression() override = default; |
751 | | |
752 | 34 | Status compress(const Slice& input, faststring* output) override { |
753 | 34 | size_t max_len = max_compressed_len(input.size); |
754 | 34 | output->resize(max_len); |
755 | 34 | Slice s(*output); |
756 | | |
757 | 34 | snappy::RawCompress(input.data, input.size, s.data, &s.size); |
758 | 34 | output->resize(s.size); |
759 | 34 | return Status::OK(); |
760 | 34 | } |
761 | | |
762 | 76 | Status decompress(const Slice& input, Slice* output) override { |
763 | 76 | if (!snappy::RawUncompress(input.data, input.size, output->data)) { |
764 | 0 | return Status::InvalidArgument("Fail to do Snappy decompress"); |
765 | 0 | } |
766 | | // NOTE: GetUncompressedLength only takes O(1) time |
767 | 76 | snappy::GetUncompressedLength(input.data, input.size, &output->size); |
768 | 76 | return Status::OK(); |
769 | 76 | } |
770 | | |
771 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
772 | 1 | faststring* output) override { |
773 | 1 | auto max_len = max_compressed_len(uncompressed_size); |
774 | 1 | output->resize(max_len); |
775 | | |
776 | 1 | SnappySlicesSource source(inputs); |
777 | 1 | snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(output->data())); |
778 | 1 | output->resize(snappy::Compress(&source, &sink)); |
779 | 1 | return Status::OK(); |
780 | 1 | } |
781 | | |
782 | 35 | size_t max_compressed_len(size_t len) override { return snappy::MaxCompressedLength(len); } |
783 | | }; |
784 | | |
785 | | class HadoopSnappyBlockCompression : public SnappyBlockCompression { |
786 | | public: |
787 | 0 | static HadoopSnappyBlockCompression* instance() { |
788 | 0 | static HadoopSnappyBlockCompression s_instance; |
789 | 0 | return &s_instance; |
790 | 0 | } |
791 | 0 | ~HadoopSnappyBlockCompression() override = default; |
792 | | |
793 | | // hadoop use block compression for snappy |
794 | | // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc |
795 | 0 | Status compress(const Slice& input, faststring* output) override { |
796 | | // be same with hadop https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java |
797 | 0 | size_t snappy_block_size = config::snappy_compression_block_size; |
798 | 0 | size_t overhead = snappy_block_size / 6 + 32; |
799 | 0 | size_t max_input_size = snappy_block_size - overhead; |
800 | |
|
801 | 0 | size_t data_len = input.size; |
802 | 0 | char* data = input.data; |
803 | 0 | std::vector<OwnedSlice> buffers; |
804 | 0 | size_t out_len = 0; |
805 | |
|
806 | 0 | while (data_len > 0) { |
807 | 0 | size_t input_size = std::min(data_len, max_input_size); |
808 | 0 | Slice input_slice(data, input_size); |
809 | 0 | faststring output_data; |
810 | 0 | RETURN_IF_ERROR(SnappyBlockCompression::compress(input_slice, &output_data)); |
811 | 0 | out_len += output_data.size(); |
812 | | // the OwnedSlice will be moved here |
813 | 0 | buffers.push_back(output_data.build()); |
814 | 0 | data += input_size; |
815 | 0 | data_len -= input_size; |
816 | 0 | } |
817 | | |
818 | | // hadoop block compression: umcompressed_length | compressed_length1 | compressed_data1 | compressed_length2 | compressed_data2 | ... |
819 | 0 | size_t total_output_len = 4 + 4 * buffers.size() + out_len; |
820 | 0 | output->resize(total_output_len); |
821 | 0 | char* output_buffer = (char*)output->data(); |
822 | 0 | BigEndian::Store32(output_buffer, input.get_size()); |
823 | 0 | output_buffer += 4; |
824 | 0 | for (const auto& buffer : buffers) { |
825 | 0 | auto slice = buffer.slice(); |
826 | 0 | BigEndian::Store32(output_buffer, slice.get_size()); |
827 | 0 | output_buffer += 4; |
828 | 0 | memcpy(output_buffer, slice.get_data(), slice.get_size()); |
829 | 0 | output_buffer += slice.get_size(); |
830 | 0 | } |
831 | |
|
832 | 0 | DCHECK_EQ(output_buffer - (char*)output->data(), total_output_len); |
833 | |
|
834 | 0 | return Status::OK(); |
835 | 0 | } |
836 | | |
837 | 0 | Status decompress(const Slice& input, Slice* output) override { |
838 | 0 | return Status::InternalError("unimplement: SnappyHadoopBlockCompression::decompress"); |
839 | 0 | } |
840 | | }; |
841 | | |
842 | | class ZlibBlockCompression : public BlockCompressionCodec { |
843 | | public: |
844 | 2 | static ZlibBlockCompression* instance() { |
845 | 2 | static ZlibBlockCompression s_instance; |
846 | 2 | return &s_instance; |
847 | 2 | } |
848 | 1 | ~ZlibBlockCompression() override = default; |
849 | | |
850 | 5 | Status compress(const Slice& input, faststring* output) override { |
851 | 5 | size_t max_len = max_compressed_len(input.size); |
852 | 5 | output->resize(max_len); |
853 | 5 | Slice s(*output); |
854 | | |
855 | 5 | auto zres = ::compress((Bytef*)s.data, &s.size, (Bytef*)input.data, input.size); |
856 | 5 | if (zres != Z_OK) { |
857 | 0 | return Status::InvalidArgument("Fail to do ZLib compress, error={}", zError(zres)); |
858 | 0 | } |
859 | 5 | output->resize(s.size); |
860 | 5 | return Status::OK(); |
861 | 5 | } |
862 | | |
863 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
864 | 1 | faststring* output) override { |
865 | 1 | size_t max_len = max_compressed_len(uncompressed_size); |
866 | 1 | output->resize(max_len); |
867 | | |
868 | 1 | z_stream zstrm; |
869 | 1 | zstrm.zalloc = Z_NULL; |
870 | 1 | zstrm.zfree = Z_NULL; |
871 | 1 | zstrm.opaque = Z_NULL; |
872 | 1 | auto zres = deflateInit(&zstrm, Z_DEFAULT_COMPRESSION); |
873 | 1 | if (zres != Z_OK) { |
874 | 0 | return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}", |
875 | 0 | zError(zres), zres); |
876 | 0 | } |
877 | | // we assume that output is e |
878 | 1 | zstrm.next_out = (Bytef*)output->data(); |
879 | 1 | zstrm.avail_out = output->size(); |
880 | 6 | for (int i = 0; i < inputs.size(); ++i) { |
881 | 5 | if (inputs[i].size == 0) { |
882 | 1 | continue; |
883 | 1 | } |
884 | 4 | zstrm.next_in = (Bytef*)inputs[i].data; |
885 | 4 | zstrm.avail_in = inputs[i].size; |
886 | 4 | int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH; |
887 | | |
888 | 4 | zres = deflate(&zstrm, flush); |
889 | 4 | if (zres != Z_OK && zres != Z_STREAM_END) { |
890 | 0 | return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}", |
891 | 0 | zError(zres), zres); |
892 | 0 | } |
893 | 4 | } |
894 | | |
895 | 1 | output->resize(zstrm.total_out); |
896 | 1 | zres = deflateEnd(&zstrm); |
897 | 1 | if (zres != Z_OK) { |
898 | 0 | return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}", |
899 | 0 | zError(zres), zres); |
900 | 0 | } |
901 | 1 | return Status::OK(); |
902 | 1 | } |
903 | | |
904 | 14 | Status decompress(const Slice& input, Slice* output) override { |
905 | 14 | size_t input_size = input.size; |
906 | 14 | auto zres = |
907 | 14 | ::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size); |
908 | 14 | if (zres != Z_OK) { |
909 | 8 | return Status::InvalidArgument("Fail to do ZLib decompress, error={}", zError(zres)); |
910 | 8 | } |
911 | 6 | return Status::OK(); |
912 | 14 | } |
913 | | |
914 | 6 | size_t max_compressed_len(size_t len) override { |
915 | | // one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block |
916 | 6 | return len + 6 + 5 * ((len >> 14) + 1); |
917 | 6 | } |
918 | | }; |
919 | | |
920 | | class Bzip2BlockCompression : public BlockCompressionCodec { |
921 | | public: |
922 | 0 | static Bzip2BlockCompression* instance() { |
923 | 0 | static Bzip2BlockCompression s_instance; |
924 | 0 | return &s_instance; |
925 | 0 | } |
926 | 0 | ~Bzip2BlockCompression() override = default; |
927 | | |
928 | 0 | Status compress(const Slice& input, faststring* output) override { |
929 | 0 | size_t max_len = max_compressed_len(input.size); |
930 | 0 | output->resize(max_len); |
931 | 0 | uint32_t size = output->size(); |
932 | 0 | auto bzres = BZ2_bzBuffToBuffCompress((char*)output->data(), &size, (char*)input.data, |
933 | 0 | input.size, 9, 0, 0); |
934 | 0 | if (bzres != BZ_OK) { |
935 | 0 | return Status::InternalError("Fail to do Bzip2 compress, ret={}", bzres); |
936 | 0 | } |
937 | 0 | output->resize(size); |
938 | 0 | return Status::OK(); |
939 | 0 | } |
940 | | |
941 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
942 | 0 | faststring* output) override { |
943 | 0 | size_t max_len = max_compressed_len(uncompressed_size); |
944 | 0 | output->resize(max_len); |
945 | |
|
946 | 0 | bz_stream bzstrm; |
947 | 0 | bzero(&bzstrm, sizeof(bzstrm)); |
948 | 0 | int bzres = BZ2_bzCompressInit(&bzstrm, 9, 0, 0); |
949 | 0 | if (bzres != BZ_OK) { |
950 | 0 | return Status::InternalError("Failed to init bz2. status code: {}", bzres); |
951 | 0 | } |
952 | | // we assume that output is e |
953 | 0 | bzstrm.next_out = (char*)output->data(); |
954 | 0 | bzstrm.avail_out = output->size(); |
955 | 0 | for (int i = 0; i < inputs.size(); ++i) { |
956 | 0 | if (inputs[i].size == 0) { |
957 | 0 | continue; |
958 | 0 | } |
959 | 0 | bzstrm.next_in = (char*)inputs[i].data; |
960 | 0 | bzstrm.avail_in = inputs[i].size; |
961 | 0 | int flush = (i == (inputs.size() - 1)) ? BZ_FINISH : BZ_RUN; |
962 | |
|
963 | 0 | bzres = BZ2_bzCompress(&bzstrm, flush); |
964 | 0 | if (bzres != BZ_OK && bzres != BZ_STREAM_END) { |
965 | 0 | return Status::InternalError("Fail to do bzip2 stream compress, res={}", bzres); |
966 | 0 | } |
967 | 0 | } |
968 | | |
969 | 0 | size_t total_out = (size_t)bzstrm.total_out_hi32 << 32 | (size_t)bzstrm.total_out_lo32; |
970 | 0 | output->resize(total_out); |
971 | 0 | bzres = BZ2_bzCompressEnd(&bzstrm); |
972 | 0 | if (bzres != BZ_OK) { |
973 | 0 | return Status::InternalError("Fail to do deflateEnd on bzip2 stream, res={}", bzres); |
974 | 0 | } |
975 | 0 | return Status::OK(); |
976 | 0 | } |
977 | | |
978 | 0 | Status decompress(const Slice& input, Slice* output) override { |
979 | 0 | return Status::InternalError("unimplement: Bzip2BlockCompression::decompress"); |
980 | 0 | } |
981 | | |
982 | 0 | size_t max_compressed_len(size_t len) override { |
983 | | // TODO: make sure the max_compressed_len for bzip2 |
984 | 0 | return len * 2; |
985 | 0 | } |
986 | | }; |
987 | | |
988 | | // for ZSTD compression and decompression, with BOTH fast and high compression ratio |
989 | | class ZstdBlockCompression : public BlockCompressionCodec { |
990 | | private: |
991 | | class CContext { |
992 | | ENABLE_FACTORY_CREATOR(CContext); |
993 | | |
994 | | public: |
995 | 1 | CContext() : ctx(nullptr) { |
996 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
997 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
998 | 1 | buffer = std::make_unique<faststring>(); |
999 | 1 | } |
1000 | | ZSTD_CCtx* ctx; |
1001 | | std::unique_ptr<faststring> buffer; |
1002 | 1 | ~CContext() { |
1003 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
1004 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
1005 | 1 | if (ctx) { |
1006 | 1 | ZSTD_freeCCtx(ctx); |
1007 | 1 | } |
1008 | 1 | buffer.reset(); |
1009 | 1 | } |
1010 | | }; |
1011 | | class DContext { |
1012 | | ENABLE_FACTORY_CREATOR(DContext); |
1013 | | |
1014 | | public: |
1015 | 6 | DContext() : ctx(nullptr) {} |
1016 | | ZSTD_DCtx* ctx; |
1017 | 6 | ~DContext() { |
1018 | 6 | if (ctx) { |
1019 | 6 | ZSTD_freeDCtx(ctx); |
1020 | 6 | } |
1021 | 6 | } |
1022 | | }; |
1023 | | |
1024 | | public: |
1025 | 169 | static ZstdBlockCompression* instance() { |
1026 | 169 | static ZstdBlockCompression s_instance; |
1027 | 169 | return &s_instance; |
1028 | 169 | } |
1029 | 1 | ~ZstdBlockCompression() { |
1030 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
1031 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
1032 | 1 | _ctx_c_pool.clear(); |
1033 | 1 | _ctx_d_pool.clear(); |
1034 | 1 | } |
1035 | | |
1036 | 141 | size_t max_compressed_len(size_t len) override { return ZSTD_compressBound(len); } |
1037 | | |
1038 | 7 | Status compress(const Slice& input, faststring* output) override { |
1039 | 7 | std::vector<Slice> inputs {input}; |
1040 | 7 | return compress(inputs, input.size, output); |
1041 | 7 | } |
1042 | | |
1043 | | // follow ZSTD official example |
1044 | | // https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c |
1045 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
1046 | 141 | faststring* output) override { |
1047 | 141 | std::unique_ptr<CContext> context; |
1048 | 141 | RETURN_IF_ERROR(_acquire_compression_ctx(context)); |
1049 | 141 | bool compress_failed = false; |
1050 | 141 | Defer defer {[&] { |
1051 | 141 | if (!compress_failed) { |
1052 | 141 | _release_compression_ctx(std::move(context)); |
1053 | 141 | } |
1054 | 141 | }}; |
1055 | | |
1056 | 141 | try { |
1057 | 141 | size_t max_len = max_compressed_len(uncompressed_size); |
1058 | 141 | Slice compressed_buf; |
1059 | 141 | if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
1060 | | // use output directly |
1061 | 0 | output->resize(max_len); |
1062 | 0 | compressed_buf.data = reinterpret_cast<char*>(output->data()); |
1063 | 0 | compressed_buf.size = max_len; |
1064 | 141 | } else { |
1065 | 141 | { |
1066 | 141 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
1067 | 141 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
1068 | | // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE |
1069 | 141 | context->buffer->resize(max_len); |
1070 | 141 | } |
1071 | 141 | compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); |
1072 | 141 | compressed_buf.size = max_len; |
1073 | 141 | } |
1074 | | |
1075 | | // set compression level to default 3 |
1076 | 141 | auto ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_compressionLevel, |
1077 | 141 | ZSTD_CLEVEL_DEFAULT); |
1078 | 141 | if (ZSTD_isError(ret)) { |
1079 | 0 | return Status::InvalidArgument("ZSTD_CCtx_setParameter compression level error: {}", |
1080 | 0 | ZSTD_getErrorString(ZSTD_getErrorCode(ret))); |
1081 | 0 | } |
1082 | | // set checksum flag to 1 |
1083 | 141 | ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1); |
1084 | 141 | if (ZSTD_isError(ret)) { |
1085 | 0 | return Status::InvalidArgument("ZSTD_CCtx_setParameter checksumFlag error: {}", |
1086 | 0 | ZSTD_getErrorString(ZSTD_getErrorCode(ret))); |
1087 | 0 | } |
1088 | | |
1089 | 141 | ZSTD_outBuffer out_buf = {compressed_buf.data, compressed_buf.size, 0}; |
1090 | | |
1091 | 286 | for (size_t i = 0; i < inputs.size(); i++) { |
1092 | 145 | ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0}; |
1093 | | |
1094 | 145 | bool last_input = (i == inputs.size() - 1); |
1095 | 145 | auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue; |
1096 | | |
1097 | 145 | bool finished = false; |
1098 | 145 | do { |
1099 | | // do compress |
1100 | 145 | auto ret = ZSTD_compressStream2(context->ctx, &out_buf, &in_buf, mode); |
1101 | | |
1102 | 145 | if (ZSTD_isError(ret)) { |
1103 | 0 | compress_failed = true; |
1104 | 0 | return Status::InvalidArgument("ZSTD_compressStream2 error: {}", |
1105 | 0 | ZSTD_getErrorString(ZSTD_getErrorCode(ret))); |
1106 | 0 | } |
1107 | | |
1108 | | // ret is ZSTD hint for needed output buffer size |
1109 | 145 | if (ret > 0 && out_buf.pos == out_buf.size) { |
1110 | 0 | compress_failed = true; |
1111 | 0 | return Status::InvalidArgument("ZSTD_compressStream2 output buffer full"); |
1112 | 0 | } |
1113 | | |
1114 | 145 | finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size); |
1115 | 145 | } while (!finished); |
1116 | 145 | } |
1117 | | |
1118 | | // set compressed size for caller |
1119 | 141 | output->resize(out_buf.pos); |
1120 | 141 | if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
1121 | 141 | output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), out_buf.pos); |
1122 | 141 | } |
1123 | 141 | } catch (std::exception& e) { |
1124 | 0 | return Status::InternalError("Fail to do ZSTD compress due to exception {}", e.what()); |
1125 | 0 | } catch (...) { |
1126 | | // Do not set compress_failed to release context |
1127 | 0 | DCHECK(!compress_failed); |
1128 | 0 | return Status::InternalError("Fail to do ZSTD compress due to exception"); |
1129 | 0 | } |
1130 | | |
1131 | 141 | return Status::OK(); |
1132 | 141 | } |
1133 | | |
1134 | 132 | Status decompress(const Slice& input, Slice* output) override { |
1135 | 132 | std::unique_ptr<DContext> context; |
1136 | 132 | bool decompress_failed = false; |
1137 | 132 | RETURN_IF_ERROR(_acquire_decompression_ctx(context)); |
1138 | 132 | Defer defer {[&] { |
1139 | 132 | if (!decompress_failed) { |
1140 | 127 | _release_decompression_ctx(std::move(context)); |
1141 | 127 | } |
1142 | 132 | }}; |
1143 | | |
1144 | 132 | size_t ret = ZSTD_decompressDCtx(context->ctx, output->data, output->size, input.data, |
1145 | 132 | input.size); |
1146 | 132 | if (ZSTD_isError(ret)) { |
1147 | 5 | decompress_failed = true; |
1148 | 5 | return Status::InvalidArgument("ZSTD_decompressDCtx error: {}", |
1149 | 5 | ZSTD_getErrorString(ZSTD_getErrorCode(ret))); |
1150 | 5 | } |
1151 | | |
1152 | | // set decompressed size for caller |
1153 | 127 | output->size = ret; |
1154 | | |
1155 | 127 | return Status::OK(); |
1156 | 132 | } |
1157 | | |
1158 | | private: |
1159 | 141 | Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) { |
1160 | 141 | std::lock_guard<std::mutex> l(_ctx_c_mutex); |
1161 | 141 | if (_ctx_c_pool.empty()) { |
1162 | 1 | std::unique_ptr<CContext> localCtx = CContext::create_unique(); |
1163 | 1 | if (localCtx.get() == nullptr) { |
1164 | 0 | return Status::InvalidArgument("failed to new ZSTD CContext"); |
1165 | 0 | } |
1166 | | //typedef LZ4F_cctx* LZ4F_compressionContext_t; |
1167 | 1 | localCtx->ctx = ZSTD_createCCtx(); |
1168 | 1 | if (localCtx->ctx == nullptr) { |
1169 | 0 | return Status::InvalidArgument("Failed to create ZSTD compress ctx"); |
1170 | 0 | } |
1171 | 1 | out = std::move(localCtx); |
1172 | 1 | return Status::OK(); |
1173 | 1 | } |
1174 | 140 | out = std::move(_ctx_c_pool.back()); |
1175 | 140 | _ctx_c_pool.pop_back(); |
1176 | 140 | return Status::OK(); |
1177 | 141 | } |
1178 | 141 | void _release_compression_ctx(std::unique_ptr<CContext> context) { |
1179 | 141 | DCHECK(context); |
1180 | 141 | auto ret = ZSTD_CCtx_reset(context->ctx, ZSTD_reset_session_only); |
1181 | 141 | DCHECK(!ZSTD_isError(ret)); |
1182 | 141 | std::lock_guard<std::mutex> l(_ctx_c_mutex); |
1183 | 141 | _ctx_c_pool.push_back(std::move(context)); |
1184 | 141 | } |
1185 | | |
1186 | 132 | Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) { |
1187 | 132 | std::lock_guard<std::mutex> l(_ctx_d_mutex); |
1188 | 132 | if (_ctx_d_pool.empty()) { |
1189 | 6 | std::unique_ptr<DContext> localCtx = DContext::create_unique(); |
1190 | 6 | if (localCtx.get() == nullptr) { |
1191 | 0 | return Status::InvalidArgument("failed to new ZSTD DContext"); |
1192 | 0 | } |
1193 | 6 | localCtx->ctx = ZSTD_createDCtx(); |
1194 | 6 | if (localCtx->ctx == nullptr) { |
1195 | 0 | return Status::InvalidArgument("Fail to init ZSTD decompress context"); |
1196 | 0 | } |
1197 | 6 | out = std::move(localCtx); |
1198 | 6 | return Status::OK(); |
1199 | 6 | } |
1200 | 126 | out = std::move(_ctx_d_pool.back()); |
1201 | 126 | _ctx_d_pool.pop_back(); |
1202 | 126 | return Status::OK(); |
1203 | 132 | } |
1204 | 127 | void _release_decompression_ctx(std::unique_ptr<DContext> context) { |
1205 | 127 | DCHECK(context); |
1206 | | // reset ctx to start a new decompress session |
1207 | 127 | auto ret = ZSTD_DCtx_reset(context->ctx, ZSTD_reset_session_only); |
1208 | 127 | DCHECK(!ZSTD_isError(ret)); |
1209 | 127 | std::lock_guard<std::mutex> l(_ctx_d_mutex); |
1210 | 127 | _ctx_d_pool.push_back(std::move(context)); |
1211 | 127 | } |
1212 | | |
1213 | | private: |
1214 | | mutable std::mutex _ctx_c_mutex; |
1215 | | mutable std::vector<std::unique_ptr<CContext>> _ctx_c_pool; |
1216 | | |
1217 | | mutable std::mutex _ctx_d_mutex; |
1218 | | mutable std::vector<std::unique_ptr<DContext>> _ctx_d_pool; |
1219 | | }; |
1220 | | |
1221 | | class GzipBlockCompression : public ZlibBlockCompression { |
1222 | | public: |
1223 | 0 | static GzipBlockCompression* instance() { |
1224 | 0 | static GzipBlockCompression s_instance; |
1225 | 0 | return &s_instance; |
1226 | 0 | } |
1227 | 0 | ~GzipBlockCompression() override = default; |
1228 | | |
1229 | 0 | Status compress(const Slice& input, faststring* output) override { |
1230 | 0 | size_t max_len = max_compressed_len(input.size); |
1231 | 0 | output->resize(max_len); |
1232 | |
|
1233 | 0 | z_stream z_strm = {}; |
1234 | 0 | z_strm.zalloc = Z_NULL; |
1235 | 0 | z_strm.zfree = Z_NULL; |
1236 | 0 | z_strm.opaque = Z_NULL; |
1237 | |
|
1238 | 0 | int zres = deflateInit2(&z_strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, |
1239 | 0 | 8, Z_DEFAULT_STRATEGY); |
1240 | |
|
1241 | 0 | if (zres != Z_OK) { |
1242 | 0 | return Status::InvalidArgument("Fail to init zlib compress"); |
1243 | 0 | } |
1244 | | |
1245 | 0 | z_strm.next_in = (Bytef*)input.get_data(); |
1246 | 0 | z_strm.avail_in = input.get_size(); |
1247 | 0 | z_strm.next_out = (Bytef*)output->data(); |
1248 | 0 | z_strm.avail_out = output->size(); |
1249 | |
|
1250 | 0 | zres = deflate(&z_strm, Z_FINISH); |
1251 | 0 | if (zres != Z_OK && zres != Z_STREAM_END) { |
1252 | 0 | return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}", |
1253 | 0 | zError(zres), zres); |
1254 | 0 | } |
1255 | | |
1256 | 0 | output->resize(z_strm.total_out); |
1257 | 0 | zres = deflateEnd(&z_strm); |
1258 | 0 | if (zres != Z_OK) { |
1259 | 0 | return Status::InvalidArgument("Fail to end zlib compress"); |
1260 | 0 | } |
1261 | 0 | return Status::OK(); |
1262 | 0 | } |
1263 | | |
1264 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
1265 | 0 | faststring* output) override { |
1266 | 0 | size_t max_len = max_compressed_len(uncompressed_size); |
1267 | 0 | output->resize(max_len); |
1268 | |
|
1269 | 0 | z_stream zstrm; |
1270 | 0 | zstrm.zalloc = Z_NULL; |
1271 | 0 | zstrm.zfree = Z_NULL; |
1272 | 0 | zstrm.opaque = Z_NULL; |
1273 | 0 | auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, |
1274 | 0 | 8, Z_DEFAULT_STRATEGY); |
1275 | 0 | if (zres != Z_OK) { |
1276 | 0 | return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}", |
1277 | 0 | zError(zres), zres); |
1278 | 0 | } |
1279 | | // we assume that output is e |
1280 | 0 | zstrm.next_out = (Bytef*)output->data(); |
1281 | 0 | zstrm.avail_out = output->size(); |
1282 | 0 | for (int i = 0; i < inputs.size(); ++i) { |
1283 | 0 | if (inputs[i].size == 0) { |
1284 | 0 | continue; |
1285 | 0 | } |
1286 | 0 | zstrm.next_in = (Bytef*)inputs[i].data; |
1287 | 0 | zstrm.avail_in = inputs[i].size; |
1288 | 0 | int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH; |
1289 | |
|
1290 | 0 | zres = deflate(&zstrm, flush); |
1291 | 0 | if (zres != Z_OK && zres != Z_STREAM_END) { |
1292 | 0 | return Status::InvalidArgument("Fail to do ZLib stream compress, error={}, res={}", |
1293 | 0 | zError(zres), zres); |
1294 | 0 | } |
1295 | 0 | } |
1296 | | |
1297 | 0 | output->resize(zstrm.total_out); |
1298 | 0 | zres = deflateEnd(&zstrm); |
1299 | 0 | if (zres != Z_OK) { |
1300 | 0 | return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}", |
1301 | 0 | zError(zres), zres); |
1302 | 0 | } |
1303 | 0 | return Status::OK(); |
1304 | 0 | } |
1305 | | |
1306 | 0 | Status decompress(const Slice& input, Slice* output) override { |
1307 | 0 | z_stream z_strm = {}; |
1308 | 0 | z_strm.zalloc = Z_NULL; |
1309 | 0 | z_strm.zfree = Z_NULL; |
1310 | 0 | z_strm.opaque = Z_NULL; |
1311 | |
|
1312 | 0 | int ret = inflateInit2(&z_strm, MAX_WBITS + GZIP_CODEC); |
1313 | 0 | if (ret != Z_OK) { |
1314 | 0 | return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
1315 | 0 | zError(ret), ret); |
1316 | 0 | } |
1317 | | |
1318 | | // 1. set input and output |
1319 | 0 | z_strm.next_in = reinterpret_cast<Bytef*>(input.data); |
1320 | 0 | z_strm.avail_in = input.size; |
1321 | 0 | z_strm.next_out = reinterpret_cast<Bytef*>(output->data); |
1322 | 0 | z_strm.avail_out = output->size; |
1323 | |
|
1324 | 0 | if (z_strm.avail_out > 0) { |
1325 | | // We only support non-streaming use case for block decompressor |
1326 | 0 | ret = inflate(&z_strm, Z_FINISH); |
1327 | 0 | if (ret != Z_OK && ret != Z_STREAM_END) { |
1328 | 0 | (void)inflateEnd(&z_strm); |
1329 | 0 | return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
1330 | 0 | zError(ret), ret); |
1331 | 0 | } |
1332 | 0 | } |
1333 | 0 | (void)inflateEnd(&z_strm); |
1334 | |
|
1335 | 0 | return Status::OK(); |
1336 | 0 | } |
1337 | | |
1338 | 0 | size_t max_compressed_len(size_t len) override { |
1339 | 0 | z_stream zstrm; |
1340 | 0 | zstrm.zalloc = Z_NULL; |
1341 | 0 | zstrm.zfree = Z_NULL; |
1342 | 0 | zstrm.opaque = Z_NULL; |
1343 | 0 | auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, |
1344 | 0 | MEM_LEVEL, Z_DEFAULT_STRATEGY); |
1345 | 0 | if (zres != Z_OK) { |
1346 | | // Fall back to zlib estimate logic for deflate, notice this may |
1347 | | // cause decompress error |
1348 | 0 | LOG(WARNING) << "Fail to do ZLib stream compress, error=" << zError(zres) |
1349 | 0 | << ", res=" << zres; |
1350 | 0 | return ZlibBlockCompression::max_compressed_len(len); |
1351 | 0 | } else { |
1352 | 0 | zres = deflateEnd(&zstrm); |
1353 | 0 | if (zres != Z_OK) { |
1354 | 0 | LOG(WARNING) << "Fail to do deflateEnd on ZLib stream, error=" << zError(zres) |
1355 | 0 | << ", res=" << zres; |
1356 | 0 | } |
1357 | | // Mark, maintainer of zlib, has stated that 12 needs to be added to |
1358 | | // result for gzip |
1359 | | // http://compgroups.net/comp.unix.programmer/gzip-compressing-an-in-memory-string-usi/54854 |
1360 | | // To have a safe upper bound for "wrapper variations", we add 32 to |
1361 | | // estimate |
1362 | 0 | int upper_bound = deflateBound(&zstrm, len) + 32; |
1363 | 0 | return upper_bound; |
1364 | 0 | } |
1365 | 0 | } |
1366 | | |
1367 | | private: |
1368 | | // Magic number for zlib, see https://zlib.net/manual.html for more details. |
1369 | | const static int GZIP_CODEC = 16; // gzip |
1370 | | // The memLevel parameter specifies how much memory should be allocated for |
1371 | | // the internal compression state. |
1372 | | const static int MEM_LEVEL = 8; |
1373 | | }; |
1374 | | |
1375 | | // Only used on x86 or x86_64 |
1376 | | #if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \ |
1377 | | defined(__i386) || defined(_M_IX86) |
1378 | | class GzipBlockCompressionByLibdeflate final : public GzipBlockCompression { |
1379 | | public: |
1380 | 0 | GzipBlockCompressionByLibdeflate() : GzipBlockCompression() {} |
1381 | 0 | static GzipBlockCompressionByLibdeflate* instance() { |
1382 | 0 | static GzipBlockCompressionByLibdeflate s_instance; |
1383 | 0 | return &s_instance; |
1384 | 0 | } |
1385 | 0 | ~GzipBlockCompressionByLibdeflate() override = default; |
1386 | | |
1387 | 0 | Status decompress(const Slice& input, Slice* output) override { |
1388 | 0 | if (input.empty()) { |
1389 | 0 | output->size = 0; |
1390 | 0 | return Status::OK(); |
1391 | 0 | } |
1392 | 0 | thread_local std::unique_ptr<libdeflate_decompressor, void (*)(libdeflate_decompressor*)> |
1393 | 0 | decompressor {libdeflate_alloc_decompressor(), libdeflate_free_decompressor}; |
1394 | 0 | if (!decompressor) { |
1395 | 0 | return Status::InternalError("libdeflate_alloc_decompressor error."); |
1396 | 0 | } |
1397 | 0 | std::size_t out_len; |
1398 | 0 | auto result = libdeflate_gzip_decompress(decompressor.get(), input.data, input.size, |
1399 | 0 | output->data, output->size, &out_len); |
1400 | 0 | if (result != LIBDEFLATE_SUCCESS) { |
1401 | 0 | return Status::InternalError("libdeflate_gzip_decompress error, res={}", result); |
1402 | 0 | } |
1403 | 0 | return Status::OK(); |
1404 | 0 | } |
1405 | | }; |
1406 | | #endif |
1407 | | |
1408 | | class LzoBlockCompression final : public BlockCompressionCodec { |
1409 | | public: |
1410 | 0 | static LzoBlockCompression* instance() { |
1411 | 0 | static LzoBlockCompression s_instance; |
1412 | 0 | return &s_instance; |
1413 | 0 | } |
1414 | | |
1415 | 0 | Status compress(const Slice& input, faststring* output) override { |
1416 | 0 | return Status::InvalidArgument("not impl lzo compress."); |
1417 | 0 | } |
1418 | 0 | size_t max_compressed_len(size_t len) override { return 0; }; |
1419 | 0 | Status decompress(const Slice& input, Slice* output) override { |
1420 | 0 | auto* input_ptr = input.data; |
1421 | 0 | auto remain_input_size = input.size; |
1422 | 0 | auto* output_ptr = output->data; |
1423 | 0 | auto remain_output_size = output->size; |
1424 | 0 | auto* output_limit = output->data + output->size; |
1425 | | |
1426 | | // Example: |
1427 | | // OriginData(The original data will be divided into several large data block.) : |
1428 | | // large data block1 | large data block2 | large data block3 | .... |
1429 | | // The large data block will be divided into several small data block. |
1430 | | // Suppose a large data block is divided into three small blocks: |
1431 | | // large data block1: | small block1 | small block2 | small block3 | |
1432 | | // CompressData: <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]> |
1433 | | // |
1434 | | // A : original length of the current block of large data block. |
1435 | | // sizeof(A) = 4 bytes. |
1436 | | // A = length(small block1) + length(small block2) + length(small block3) |
1437 | | // Bx : length of small data block bx. |
1438 | | // sizeof(Bx) = 4 bytes. |
1439 | | // Bx = length(compress(small blockx)) |
1440 | 0 | try { |
1441 | 0 | while (remain_input_size > 0) { |
1442 | 0 | if (remain_input_size < 4) { |
1443 | 0 | return Status::InvalidArgument( |
1444 | 0 | "Need more input buffer to get large_block_uncompressed_len."); |
1445 | 0 | } |
1446 | | |
1447 | 0 | uint32_t large_block_uncompressed_len = BigEndian::Load32(input_ptr); |
1448 | 0 | input_ptr += 4; |
1449 | 0 | remain_input_size -= 4; |
1450 | |
|
1451 | 0 | if (remain_output_size < large_block_uncompressed_len) { |
1452 | 0 | return Status::InvalidArgument( |
1453 | 0 | "Need more output buffer to get uncompressed data."); |
1454 | 0 | } |
1455 | | |
1456 | 0 | while (large_block_uncompressed_len > 0) { |
1457 | 0 | if (remain_input_size < 4) { |
1458 | 0 | return Status::InvalidArgument( |
1459 | 0 | "Need more input buffer to get small_block_compressed_len."); |
1460 | 0 | } |
1461 | | |
1462 | 0 | uint32_t small_block_compressed_len = BigEndian::Load32(input_ptr); |
1463 | 0 | input_ptr += 4; |
1464 | 0 | remain_input_size -= 4; |
1465 | |
|
1466 | 0 | if (remain_input_size < small_block_compressed_len) { |
1467 | 0 | return Status::InvalidArgument( |
1468 | 0 | "Need more input buffer to decompress small block."); |
1469 | 0 | } |
1470 | | |
1471 | 0 | auto small_block_uncompressed_len = |
1472 | 0 | orc::lzoDecompress(input_ptr, input_ptr + small_block_compressed_len, |
1473 | 0 | output_ptr, output_limit); |
1474 | |
|
1475 | 0 | input_ptr += small_block_compressed_len; |
1476 | 0 | remain_input_size -= small_block_compressed_len; |
1477 | |
|
1478 | 0 | output_ptr += small_block_uncompressed_len; |
1479 | 0 | large_block_uncompressed_len -= small_block_uncompressed_len; |
1480 | 0 | remain_output_size -= small_block_uncompressed_len; |
1481 | 0 | } |
1482 | 0 | } |
1483 | 0 | } catch (const orc::ParseError& e) { |
1484 | | //Prevent be from hanging due to orc::lzoDecompress throw exception |
1485 | 0 | return Status::InternalError("Fail to do LZO decompress, error={}", e.what()); |
1486 | 0 | } |
1487 | 0 | return Status::OK(); |
1488 | 0 | } |
1489 | | }; |
1490 | | |
1491 | | class BrotliBlockCompression final : public BlockCompressionCodec { |
1492 | | public: |
1493 | 0 | static BrotliBlockCompression* instance() { |
1494 | 0 | static BrotliBlockCompression s_instance; |
1495 | 0 | return &s_instance; |
1496 | 0 | } |
1497 | | |
1498 | 0 | Status compress(const Slice& input, faststring* output) override { |
1499 | 0 | return Status::InvalidArgument("not impl brotli compress."); |
1500 | 0 | } |
1501 | | |
1502 | 0 | size_t max_compressed_len(size_t len) override { return 0; }; |
1503 | | |
1504 | 0 | Status decompress(const Slice& input, Slice* output) override { |
1505 | | // The size of output buffer is always equal to the umcompressed length. |
1506 | 0 | BrotliDecoderResult result = BrotliDecoderDecompress( |
1507 | 0 | input.get_size(), reinterpret_cast<const uint8_t*>(input.get_data()), &output->size, |
1508 | 0 | reinterpret_cast<uint8_t*>(output->data)); |
1509 | 0 | if (result != BROTLI_DECODER_RESULT_SUCCESS) { |
1510 | 0 | return Status::InternalError("Brotli decompression failed, result={}", result); |
1511 | 0 | } |
1512 | 0 | return Status::OK(); |
1513 | 0 | } |
1514 | | }; |
1515 | | |
1516 | | Status get_block_compression_codec(segment_v2::CompressionTypePB type, |
1517 | 20.7k | BlockCompressionCodec** codec) { |
1518 | 20.7k | switch (type) { |
1519 | 106 | case segment_v2::CompressionTypePB::NO_COMPRESSION: |
1520 | 106 | *codec = nullptr; |
1521 | 106 | break; |
1522 | 47 | case segment_v2::CompressionTypePB::SNAPPY: |
1523 | 47 | *codec = SnappyBlockCompression::instance(); |
1524 | 47 | break; |
1525 | 59 | case segment_v2::CompressionTypePB::LZ4: |
1526 | 59 | *codec = Lz4BlockCompression::instance(); |
1527 | 59 | break; |
1528 | 20.3k | case segment_v2::CompressionTypePB::LZ4F: |
1529 | 20.3k | *codec = Lz4fBlockCompression::instance(); |
1530 | 20.3k | break; |
1531 | 2 | case segment_v2::CompressionTypePB::LZ4HC: |
1532 | 2 | *codec = Lz4HCBlockCompression::instance(); |
1533 | 2 | break; |
1534 | 2 | case segment_v2::CompressionTypePB::ZLIB: |
1535 | 2 | *codec = ZlibBlockCompression::instance(); |
1536 | 2 | break; |
1537 | 169 | case segment_v2::CompressionTypePB::ZSTD: |
1538 | 169 | *codec = ZstdBlockCompression::instance(); |
1539 | 169 | break; |
1540 | 0 | default: |
1541 | 0 | return Status::InternalError("unknown compression type({})", type); |
1542 | 20.7k | } |
1543 | | |
1544 | 20.7k | return Status::OK(); |
1545 | 20.7k | } |
1546 | | |
1547 | | // this can only be used in hive text write |
1548 | 0 | Status get_block_compression_codec(TFileCompressType::type type, BlockCompressionCodec** codec) { |
1549 | 0 | switch (type) { |
1550 | 0 | case TFileCompressType::PLAIN: |
1551 | 0 | *codec = nullptr; |
1552 | 0 | break; |
1553 | 0 | case TFileCompressType::ZLIB: |
1554 | 0 | *codec = ZlibBlockCompression::instance(); |
1555 | 0 | break; |
1556 | 0 | case TFileCompressType::GZ: |
1557 | 0 | *codec = GzipBlockCompression::instance(); |
1558 | 0 | break; |
1559 | 0 | case TFileCompressType::BZ2: |
1560 | 0 | *codec = Bzip2BlockCompression::instance(); |
1561 | 0 | break; |
1562 | 0 | case TFileCompressType::LZ4BLOCK: |
1563 | 0 | *codec = HadoopLz4BlockCompression::instance(); |
1564 | 0 | break; |
1565 | 0 | case TFileCompressType::SNAPPYBLOCK: |
1566 | 0 | *codec = HadoopSnappyBlockCompression::instance(); |
1567 | 0 | break; |
1568 | 0 | case TFileCompressType::ZSTD: |
1569 | 0 | *codec = ZstdBlockCompression::instance(); |
1570 | 0 | break; |
1571 | 0 | default: |
1572 | 0 | return Status::InternalError("unsupport compression type({}) int hive text", type); |
1573 | 0 | } |
1574 | | |
1575 | 0 | return Status::OK(); |
1576 | 0 | } |
1577 | | |
1578 | | Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_codec, |
1579 | 39 | BlockCompressionCodec** codec) { |
1580 | 39 | switch (parquet_codec) { |
1581 | 0 | case tparquet::CompressionCodec::UNCOMPRESSED: |
1582 | 0 | *codec = nullptr; |
1583 | 0 | break; |
1584 | 39 | case tparquet::CompressionCodec::SNAPPY: |
1585 | 39 | *codec = SnappyBlockCompression::instance(); |
1586 | 39 | break; |
1587 | 0 | case tparquet::CompressionCodec::LZ4_RAW: // we can use LZ4 compression algorithm parse LZ4_RAW |
1588 | 0 | case tparquet::CompressionCodec::LZ4: |
1589 | 0 | *codec = HadoopLz4BlockCompression::instance(); |
1590 | 0 | break; |
1591 | 0 | case tparquet::CompressionCodec::ZSTD: |
1592 | 0 | *codec = ZstdBlockCompression::instance(); |
1593 | 0 | break; |
1594 | 0 | case tparquet::CompressionCodec::GZIP: |
1595 | | // Only used on x86 or x86_64 |
1596 | 0 | #if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \ |
1597 | 0 | defined(__i386) || defined(_M_IX86) |
1598 | 0 | *codec = GzipBlockCompressionByLibdeflate::instance(); |
1599 | | #else |
1600 | | *codec = GzipBlockCompression::instance(); |
1601 | | #endif |
1602 | 0 | break; |
1603 | 0 | case tparquet::CompressionCodec::LZO: |
1604 | 0 | *codec = LzoBlockCompression::instance(); |
1605 | 0 | break; |
1606 | 0 | case tparquet::CompressionCodec::BROTLI: |
1607 | 0 | *codec = BrotliBlockCompression::instance(); |
1608 | 0 | break; |
1609 | 0 | default: |
1610 | 0 | return Status::InternalError("unknown compression type({})", parquet_codec); |
1611 | 39 | } |
1612 | | |
1613 | 39 | return Status::OK(); |
1614 | 39 | } |
1615 | | |
1616 | | } // namespace doris |