/root/doris/be/src/util/block_compression.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "util/block_compression.h" |
19 | | |
20 | | #include <bzlib.h> |
21 | | #include <gen_cpp/parquet_types.h> |
22 | | #include <gen_cpp/segment_v2.pb.h> |
23 | | #include <glog/logging.h> |
24 | | |
25 | | #include <exception> |
26 | | // Only used on x86 or x86_64 |
27 | | #if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \ |
28 | | defined(__i386) || defined(_M_IX86) |
29 | | #include <libdeflate.h> |
30 | | #endif |
31 | | #include <brotli/decode.h> |
32 | | #include <glog/log_severity.h> |
33 | | #include <glog/logging.h> |
34 | | #include <lz4/lz4.h> |
35 | | #include <lz4/lz4frame.h> |
36 | | #include <lz4/lz4hc.h> |
37 | | #include <snappy/snappy-sinksource.h> |
38 | | #include <snappy/snappy.h> |
39 | | #include <zconf.h> |
40 | | #include <zlib.h> |
41 | | #include <zstd.h> |
42 | | #include <zstd_errors.h> |
43 | | |
44 | | #include <algorithm> |
45 | | #include <cstdint> |
46 | | #include <limits> |
47 | | #include <mutex> |
48 | | #include <orc/Exceptions.hh> |
49 | | #include <ostream> |
50 | | |
51 | | #include "common/config.h" |
52 | | #include "common/factory_creator.h" |
53 | | #include "exec/decompressor.h" |
54 | | #include "gutil/endian.h" |
55 | | #include "gutil/strings/substitute.h" |
56 | | #include "runtime/thread_context.h" |
57 | | #include "util/defer_op.h" |
58 | | #include "util/faststring.h" |
59 | | |
60 | | namespace orc { |
61 | | /** |
62 | | * Decompress the bytes in to the output buffer. |
63 | | * @param inputAddress the start of the input |
64 | | * @param inputLimit one past the last byte of the input |
65 | | * @param outputAddress the start of the output buffer |
66 | | * @param outputLimit one past the last byte of the output buffer |
67 | | * @result the number of bytes decompressed |
68 | | */ |
69 | | uint64_t lzoDecompress(const char* inputAddress, const char* inputLimit, char* outputAddress, |
70 | | char* outputLimit); |
71 | | } // namespace orc |
72 | | |
73 | | namespace doris { |
74 | | |
75 | | // exception safe |
76 | | Status BlockCompressionCodec::compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
77 | 2 | faststring* output) { |
78 | 2 | faststring buf; |
79 | | // we compute total size to avoid more memory copy |
80 | 2 | buf.reserve(uncompressed_size); |
81 | 10 | for (auto& input : inputs) { |
82 | 10 | buf.append(input.data, input.size); |
83 | 10 | } |
84 | 2 | return compress(buf, output); |
85 | 2 | } |
86 | | |
87 | 15.7k | bool BlockCompressionCodec::exceed_max_compress_len(size_t uncompressed_size) { |
88 | 15.7k | return uncompressed_size > std::numeric_limits<int32_t>::max(); |
89 | 15.7k | } |
90 | | |
91 | | class Lz4BlockCompression : public BlockCompressionCodec { |
92 | | private: |
93 | | class Context { |
94 | | ENABLE_FACTORY_CREATOR(Context); |
95 | | |
96 | | public: |
97 | 1 | Context() : ctx(nullptr) { |
98 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
99 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
100 | 1 | buffer = std::make_unique<faststring>(); |
101 | 1 | } |
102 | | LZ4_stream_t* ctx; |
103 | | std::unique_ptr<faststring> buffer; |
104 | 1 | ~Context() { |
105 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
106 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
107 | 1 | if (ctx) { |
108 | 1 | LZ4_freeStream(ctx); |
109 | 1 | } |
110 | 1 | buffer.reset(); |
111 | 1 | } |
112 | | }; |
113 | | |
114 | | public: |
115 | 59 | static Lz4BlockCompression* instance() { |
116 | 59 | static Lz4BlockCompression s_instance; |
117 | 59 | return &s_instance; |
118 | 59 | } |
119 | 1 | ~Lz4BlockCompression() override { |
120 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
121 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
122 | 1 | _ctx_pool.clear(); |
123 | 1 | } |
124 | | |
125 | 44 | Status compress(const Slice& input, faststring* output) override { |
126 | 44 | if (input.size > LZ4_MAX_INPUT_SIZE) { |
127 | 0 | return Status::InvalidArgument( |
128 | 0 | "LZ4 not support those case(input.size>LZ4_MAX_INPUT_SIZE), maybe you should " |
129 | 0 | "change " |
130 | 0 | "fragment_transmission_compression_codec to snappy, input.size={}, " |
131 | 0 | "LZ4_MAX_INPUT_SIZE={}", |
132 | 0 | input.size, LZ4_MAX_INPUT_SIZE); |
133 | 0 | } |
134 | | |
135 | 44 | std::unique_ptr<Context> context; |
136 | 44 | RETURN_IF_ERROR(_acquire_compression_ctx(context)); |
137 | 44 | bool compress_failed = false; |
138 | 44 | Defer defer {[&] { |
139 | 44 | if (!compress_failed) { |
140 | 44 | _release_compression_ctx(std::move(context)); |
141 | 44 | } |
142 | 44 | }}; |
143 | | |
144 | 44 | try { |
145 | 44 | Slice compressed_buf; |
146 | 44 | size_t max_len = max_compressed_len(input.size); |
147 | 44 | if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
148 | | // use output directly |
149 | 0 | output->resize(max_len); |
150 | 0 | compressed_buf.data = reinterpret_cast<char*>(output->data()); |
151 | 0 | compressed_buf.size = max_len; |
152 | 44 | } else { |
153 | | // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE |
154 | 44 | { |
155 | | // context->buffer is resuable between queries, should accouting to |
156 | | // global tracker. |
157 | 44 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
158 | 44 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
159 | 44 | context->buffer->resize(max_len); |
160 | 44 | } |
161 | 44 | compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); |
162 | 44 | compressed_buf.size = max_len; |
163 | 44 | } |
164 | | |
165 | 44 | size_t compressed_len = |
166 | 44 | LZ4_compress_fast_continue(context->ctx, input.data, compressed_buf.data, |
167 | 44 | input.size, compressed_buf.size, ACCELARATION); |
168 | 44 | if (compressed_len == 0) { |
169 | 0 | compress_failed = true; |
170 | 0 | return Status::InvalidArgument("Output buffer's capacity is not enough, size={}", |
171 | 0 | compressed_buf.size); |
172 | 0 | } |
173 | 44 | output->resize(compressed_len); |
174 | 44 | if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
175 | 44 | output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), |
176 | 44 | compressed_len); |
177 | 44 | } |
178 | 44 | } catch (...) { |
179 | | // Do not set compress_failed to release context |
180 | 0 | DCHECK(!compress_failed); |
181 | 0 | return Status::InternalError("Fail to do LZ4Block compress due to exception"); |
182 | 0 | } |
183 | 44 | return Status::OK(); |
184 | 44 | } |
185 | | |
186 | 30 | Status decompress(const Slice& input, Slice* output) override { |
187 | 30 | auto decompressed_len = |
188 | 30 | LZ4_decompress_safe(input.data, output->data, input.size, output->size); |
189 | 30 | if (decompressed_len < 0) { |
190 | 5 | return Status::InternalError("fail to do LZ4 decompress, error={}", decompressed_len); |
191 | 5 | } |
192 | 25 | output->size = decompressed_len; |
193 | 25 | return Status::OK(); |
194 | 30 | } |
195 | | |
196 | 44 | size_t max_compressed_len(size_t len) override { return LZ4_compressBound(len); } |
197 | | |
198 | | private: |
199 | | // reuse LZ4 compress stream |
200 | 44 | Status _acquire_compression_ctx(std::unique_ptr<Context>& out) { |
201 | 44 | std::lock_guard<std::mutex> l(_ctx_mutex); |
202 | 44 | if (_ctx_pool.empty()) { |
203 | 1 | std::unique_ptr<Context> localCtx = Context::create_unique(); |
204 | 1 | if (localCtx.get() == nullptr) { |
205 | 0 | return Status::InvalidArgument("new LZ4 context error"); |
206 | 0 | } |
207 | 1 | localCtx->ctx = LZ4_createStream(); |
208 | 1 | if (localCtx->ctx == nullptr) { |
209 | 0 | return Status::InvalidArgument("LZ4_createStream error"); |
210 | 0 | } |
211 | 1 | out = std::move(localCtx); |
212 | 1 | return Status::OK(); |
213 | 1 | } |
214 | 43 | out = std::move(_ctx_pool.back()); |
215 | 43 | _ctx_pool.pop_back(); |
216 | 43 | return Status::OK(); |
217 | 44 | } |
218 | 44 | void _release_compression_ctx(std::unique_ptr<Context> context) { |
219 | 44 | DCHECK(context); |
220 | 44 | LZ4_resetStream(context->ctx); |
221 | 44 | std::lock_guard<std::mutex> l(_ctx_mutex); |
222 | 44 | _ctx_pool.push_back(std::move(context)); |
223 | 44 | } |
224 | | |
225 | | private: |
226 | | mutable std::mutex _ctx_mutex; |
227 | | mutable std::vector<std::unique_ptr<Context>> _ctx_pool; |
228 | | static const int32_t ACCELARATION = 1; |
229 | | }; |
230 | | |
231 | | class HadoopLz4BlockCompression : public Lz4BlockCompression { |
232 | | public: |
233 | 0 | HadoopLz4BlockCompression() { |
234 | 0 | Status st = Decompressor::create_decompressor(CompressType::LZ4BLOCK, &_decompressor); |
235 | 0 | if (!st.ok()) { |
236 | 0 | throw Exception(Status::FatalError( |
237 | 0 | "HadoopLz4BlockCompression construction failed. status = {}", st)); |
238 | 0 | } |
239 | 0 | } |
240 | | |
241 | 0 | ~HadoopLz4BlockCompression() override = default; |
242 | | |
243 | 0 | static HadoopLz4BlockCompression* instance() { |
244 | 0 | static HadoopLz4BlockCompression s_instance; |
245 | 0 | return &s_instance; |
246 | 0 | } |
247 | | |
248 | | // hadoop use block compression for lz4 |
249 | | // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc |
250 | 0 | Status compress(const Slice& input, faststring* output) override { |
251 | | // be same with hadop https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java |
252 | 0 | size_t lz4_block_size = config::lz4_compression_block_size; |
253 | 0 | size_t overhead = lz4_block_size / 255 + 16; |
254 | 0 | size_t max_input_size = lz4_block_size - overhead; |
255 | |
|
256 | 0 | size_t data_len = input.size; |
257 | 0 | char* data = input.data; |
258 | 0 | std::vector<OwnedSlice> buffers; |
259 | 0 | size_t out_len = 0; |
260 | |
|
261 | 0 | while (data_len > 0) { |
262 | 0 | size_t input_size = std::min(data_len, max_input_size); |
263 | 0 | Slice input_slice(data, input_size); |
264 | 0 | faststring output_data; |
265 | 0 | RETURN_IF_ERROR(Lz4BlockCompression::compress(input_slice, &output_data)); |
266 | 0 | out_len += output_data.size(); |
267 | 0 | buffers.push_back(output_data.build()); |
268 | 0 | data += input_size; |
269 | 0 | data_len -= input_size; |
270 | 0 | } |
271 | | |
272 | | // hadoop block compression: umcompressed_length | compressed_length1 | compressed_data1 | compressed_length2 | compressed_data2 | ... |
273 | 0 | size_t total_output_len = 4 + 4 * buffers.size() + out_len; |
274 | 0 | output->resize(total_output_len); |
275 | 0 | char* output_buffer = (char*)output->data(); |
276 | 0 | BigEndian::Store32(output_buffer, input.get_size()); |
277 | 0 | output_buffer += 4; |
278 | 0 | for (const auto& buffer : buffers) { |
279 | 0 | auto slice = buffer.slice(); |
280 | 0 | BigEndian::Store32(output_buffer, slice.get_size()); |
281 | 0 | output_buffer += 4; |
282 | 0 | memcpy(output_buffer, slice.get_data(), slice.get_size()); |
283 | 0 | output_buffer += slice.get_size(); |
284 | 0 | } |
285 | |
|
286 | 0 | DCHECK_EQ(output_buffer - (char*)output->data(), total_output_len); |
287 | |
|
288 | 0 | return Status::OK(); |
289 | 0 | } |
290 | | |
291 | 0 | Status decompress(const Slice& input, Slice* output) override { |
292 | 0 | size_t input_bytes_read = 0; |
293 | 0 | size_t decompressed_len = 0; |
294 | 0 | size_t more_input_bytes = 0; |
295 | 0 | size_t more_output_bytes = 0; |
296 | 0 | bool stream_end = false; |
297 | 0 | auto st = _decompressor->decompress((uint8_t*)input.data, input.size, &input_bytes_read, |
298 | 0 | (uint8_t*)output->data, output->size, &decompressed_len, |
299 | 0 | &stream_end, &more_input_bytes, &more_output_bytes); |
300 | | //try decompress use hadoopLz4 ,if failed fall back lz4. |
301 | 0 | return (st != Status::OK() || stream_end != true) |
302 | 0 | ? Lz4BlockCompression::decompress(input, output) |
303 | 0 | : Status::OK(); |
304 | 0 | } |
305 | | |
306 | | private: |
307 | | std::unique_ptr<Decompressor> _decompressor; |
308 | | }; |
309 | | // Used for LZ4 frame format, decompress speed is two times faster than LZ4. |
310 | | class Lz4fBlockCompression : public BlockCompressionCodec { |
311 | | private: |
312 | | class CContext { |
313 | | ENABLE_FACTORY_CREATOR(CContext); |
314 | | |
315 | | public: |
316 | 1 | CContext() : ctx(nullptr) { |
317 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
318 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
319 | 1 | buffer = std::make_unique<faststring>(); |
320 | 1 | } |
321 | | LZ4F_compressionContext_t ctx; |
322 | | std::unique_ptr<faststring> buffer; |
323 | 1 | ~CContext() { |
324 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
325 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
326 | 1 | if (ctx) { |
327 | 1 | LZ4F_freeCompressionContext(ctx); |
328 | 1 | } |
329 | 1 | buffer.reset(); |
330 | 1 | } |
331 | | }; |
332 | | class DContext { |
333 | | ENABLE_FACTORY_CREATOR(DContext); |
334 | | |
335 | | public: |
336 | 6 | DContext() : ctx(nullptr) {} |
337 | | LZ4F_decompressionContext_t ctx; |
338 | 6 | ~DContext() { |
339 | 6 | if (ctx) { |
340 | 6 | LZ4F_freeDecompressionContext(ctx); |
341 | 6 | } |
342 | 6 | } |
343 | | }; |
344 | | |
345 | | public: |
346 | 25.0k | static Lz4fBlockCompression* instance() { |
347 | 25.0k | static Lz4fBlockCompression s_instance; |
348 | 25.0k | return &s_instance; |
349 | 25.0k | } |
350 | 1 | ~Lz4fBlockCompression() { |
351 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
352 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
353 | 1 | _ctx_c_pool.clear(); |
354 | 1 | _ctx_d_pool.clear(); |
355 | 1 | } |
356 | | |
357 | 5 | Status compress(const Slice& input, faststring* output) override { |
358 | 5 | std::vector<Slice> inputs {input}; |
359 | 5 | return compress(inputs, input.size, output); |
360 | 5 | } |
361 | | |
362 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
363 | 15.6k | faststring* output) override { |
364 | 15.6k | return _compress(inputs, uncompressed_size, output); |
365 | 15.6k | } |
366 | | |
367 | 6.75k | Status decompress(const Slice& input, Slice* output) override { |
368 | 6.75k | return _decompress(input, output); |
369 | 6.75k | } |
370 | | |
371 | 15.6k | size_t max_compressed_len(size_t len) override { |
372 | 15.6k | return std::max(LZ4F_compressBound(len, &_s_preferences), |
373 | 15.6k | LZ4F_compressFrameBound(len, &_s_preferences)); |
374 | 15.6k | } |
375 | | |
376 | | private: |
377 | | Status _compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
378 | 15.6k | faststring* output) { |
379 | 15.6k | std::unique_ptr<CContext> context; |
380 | 15.6k | RETURN_IF_ERROR(_acquire_compression_ctx(context)); |
381 | 15.6k | bool compress_failed = false; |
382 | 15.6k | Defer defer {[&] { |
383 | 15.6k | if (!compress_failed) { |
384 | 15.6k | _release_compression_ctx(std::move(context)); |
385 | 15.6k | } |
386 | 15.6k | }}; |
387 | | |
388 | 15.6k | try { |
389 | 15.6k | Slice compressed_buf; |
390 | 15.6k | size_t max_len = max_compressed_len(uncompressed_size); |
391 | 15.6k | if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
392 | | // use output directly |
393 | 0 | output->resize(max_len); |
394 | 0 | compressed_buf.data = reinterpret_cast<char*>(output->data()); |
395 | 0 | compressed_buf.size = max_len; |
396 | 15.6k | } else { |
397 | 15.6k | { |
398 | 15.6k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
399 | 15.6k | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
400 | | // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE |
401 | 15.6k | context->buffer->resize(max_len); |
402 | 15.6k | } |
403 | 15.6k | compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); |
404 | 15.6k | compressed_buf.size = max_len; |
405 | 15.6k | } |
406 | | |
407 | 15.6k | auto wbytes = LZ4F_compressBegin(context->ctx, compressed_buf.data, compressed_buf.size, |
408 | 15.6k | &_s_preferences); |
409 | 15.6k | if (LZ4F_isError(wbytes)) { |
410 | 0 | compress_failed = true; |
411 | 0 | return Status::InvalidArgument("Fail to do LZ4F compress begin, res={}", |
412 | 0 | LZ4F_getErrorName(wbytes)); |
413 | 0 | } |
414 | 15.6k | size_t offset = wbytes; |
415 | 15.6k | for (auto input : inputs) { |
416 | 15.6k | wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data + offset, |
417 | 15.6k | compressed_buf.size - offset, input.data, input.size, |
418 | 15.6k | nullptr); |
419 | 15.6k | if (LZ4F_isError(wbytes)) { |
420 | 0 | compress_failed = true; |
421 | 0 | return Status::InvalidArgument("Fail to do LZ4F compress update, res={}", |
422 | 0 | LZ4F_getErrorName(wbytes)); |
423 | 0 | } |
424 | 15.6k | offset += wbytes; |
425 | 15.6k | } |
426 | 15.6k | wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + offset, |
427 | 15.6k | compressed_buf.size - offset, nullptr); |
428 | 15.6k | if (LZ4F_isError(wbytes)) { |
429 | 0 | compress_failed = true; |
430 | 0 | return Status::InvalidArgument("Fail to do LZ4F compress end, res={}", |
431 | 0 | LZ4F_getErrorName(wbytes)); |
432 | 0 | } |
433 | 15.6k | offset += wbytes; |
434 | 15.6k | output->resize(offset); |
435 | 15.6k | if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
436 | 15.6k | output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), offset); |
437 | 15.6k | } |
438 | 15.6k | } catch (...) { |
439 | | // Do not set compress_failed to release context |
440 | 0 | DCHECK(!compress_failed); |
441 | 0 | return Status::InternalError("Fail to do LZ4F compress due to exception"); |
442 | 0 | } |
443 | | |
444 | 15.6k | return Status::OK(); |
445 | 15.6k | } |
446 | | |
447 | 6.75k | Status _decompress(const Slice& input, Slice* output) { |
448 | 6.75k | bool decompress_failed = false; |
449 | 6.75k | std::unique_ptr<DContext> context; |
450 | 6.75k | RETURN_IF_ERROR(_acquire_decompression_ctx(context)); |
451 | 6.75k | Defer defer {[&] { |
452 | 6.75k | if (!decompress_failed) { |
453 | 6.74k | _release_decompression_ctx(std::move(context)); |
454 | 6.74k | } |
455 | 6.75k | }}; |
456 | 6.75k | size_t input_size = input.size; |
457 | 6.75k | auto lres = LZ4F_decompress(context->ctx, output->data, &output->size, input.data, |
458 | 6.75k | &input_size, nullptr); |
459 | 6.75k | if (LZ4F_isError(lres)) { |
460 | 0 | decompress_failed = true; |
461 | 0 | return Status::InternalError("Fail to do LZ4F decompress, res={}", |
462 | 0 | LZ4F_getErrorName(lres)); |
463 | 6.75k | } else if (input_size != input.size) { |
464 | 0 | decompress_failed = true; |
465 | 0 | return Status::InvalidArgument( |
466 | 0 | strings::Substitute("Fail to do LZ4F decompress: trailing data left in " |
467 | 0 | "compressed data, read=$0 vs given=$1", |
468 | 0 | input_size, input.size)); |
469 | 6.75k | } else if (lres != 0) { |
470 | 5 | decompress_failed = true; |
471 | 5 | return Status::InvalidArgument( |
472 | 5 | "Fail to do LZ4F decompress: expect more compressed data, expect={}", lres); |
473 | 5 | } |
474 | 6.74k | return Status::OK(); |
475 | 6.75k | } |
476 | | |
477 | | private: |
478 | | // acquire a compression ctx from pool, release while finish compress, |
479 | | // delete if compression failed |
480 | 15.6k | Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) { |
481 | 15.6k | std::lock_guard<std::mutex> l(_ctx_c_mutex); |
482 | 15.6k | if (_ctx_c_pool.empty()) { |
483 | 1 | std::unique_ptr<CContext> localCtx = CContext::create_unique(); |
484 | 1 | if (localCtx.get() == nullptr) { |
485 | 0 | return Status::InvalidArgument("failed to new LZ4F CContext"); |
486 | 0 | } |
487 | 1 | auto res = LZ4F_createCompressionContext(&localCtx->ctx, LZ4F_VERSION); |
488 | 1 | if (LZ4F_isError(res) != 0) { |
489 | 0 | return Status::InvalidArgument(strings::Substitute( |
490 | 0 | "LZ4F_createCompressionContext error, res=$0", LZ4F_getErrorName(res))); |
491 | 0 | } |
492 | 1 | out = std::move(localCtx); |
493 | 1 | return Status::OK(); |
494 | 1 | } |
495 | 15.6k | out = std::move(_ctx_c_pool.back()); |
496 | 15.6k | _ctx_c_pool.pop_back(); |
497 | 15.6k | return Status::OK(); |
498 | 15.6k | } |
499 | 15.6k | void _release_compression_ctx(std::unique_ptr<CContext> context) { |
500 | 15.6k | DCHECK(context); |
501 | 15.6k | std::lock_guard<std::mutex> l(_ctx_c_mutex); |
502 | 15.6k | _ctx_c_pool.push_back(std::move(context)); |
503 | 15.6k | } |
504 | | |
505 | 6.75k | Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) { |
506 | 6.75k | std::lock_guard<std::mutex> l(_ctx_d_mutex); |
507 | 6.75k | if (_ctx_d_pool.empty()) { |
508 | 6 | std::unique_ptr<DContext> localCtx = DContext::create_unique(); |
509 | 6 | if (localCtx.get() == nullptr) { |
510 | 0 | return Status::InvalidArgument("failed to new LZ4F DContext"); |
511 | 0 | } |
512 | 6 | auto res = LZ4F_createDecompressionContext(&localCtx->ctx, LZ4F_VERSION); |
513 | 6 | if (LZ4F_isError(res) != 0) { |
514 | 0 | return Status::InvalidArgument(strings::Substitute( |
515 | 0 | "LZ4F_createDeompressionContext error, res=$0", LZ4F_getErrorName(res))); |
516 | 0 | } |
517 | 6 | out = std::move(localCtx); |
518 | 6 | return Status::OK(); |
519 | 6 | } |
520 | 6.74k | out = std::move(_ctx_d_pool.back()); |
521 | 6.74k | _ctx_d_pool.pop_back(); |
522 | 6.74k | return Status::OK(); |
523 | 6.75k | } |
524 | 6.74k | void _release_decompression_ctx(std::unique_ptr<DContext> context) { |
525 | 6.74k | DCHECK(context); |
526 | | // reset decompression context to avoid ERROR_maxBlockSize_invalid |
527 | 6.74k | LZ4F_resetDecompressionContext(context->ctx); |
528 | 6.74k | std::lock_guard<std::mutex> l(_ctx_d_mutex); |
529 | 6.74k | _ctx_d_pool.push_back(std::move(context)); |
530 | 6.74k | } |
531 | | |
532 | | private: |
533 | | static LZ4F_preferences_t _s_preferences; |
534 | | |
535 | | std::mutex _ctx_c_mutex; |
536 | | // LZ4F_compressionContext_t is a pointer so no copy here |
537 | | std::vector<std::unique_ptr<CContext>> _ctx_c_pool; |
538 | | |
539 | | std::mutex _ctx_d_mutex; |
540 | | std::vector<std::unique_ptr<DContext>> _ctx_d_pool; |
541 | | }; |
542 | | |
543 | | LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = { |
544 | | {LZ4F_max256KB, LZ4F_blockLinked, LZ4F_noContentChecksum, LZ4F_frame, 0ULL, 0U, |
545 | | LZ4F_noBlockChecksum}, |
546 | | 0, |
547 | | 0u, |
548 | | 0u, |
549 | | {0u, 0u, 0u}}; |
550 | | |
551 | | class Lz4HCBlockCompression : public BlockCompressionCodec { |
552 | | private: |
553 | | class Context { |
554 | | ENABLE_FACTORY_CREATOR(Context); |
555 | | |
556 | | public: |
557 | 1 | Context() : ctx(nullptr) { |
558 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
559 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
560 | 1 | buffer = std::make_unique<faststring>(); |
561 | 1 | } |
562 | | LZ4_streamHC_t* ctx; |
563 | | std::unique_ptr<faststring> buffer; |
564 | 1 | ~Context() { |
565 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
566 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
567 | 1 | if (ctx) { |
568 | 1 | LZ4_freeStreamHC(ctx); |
569 | 1 | } |
570 | 1 | buffer.reset(); |
571 | 1 | } |
572 | | }; |
573 | | |
574 | | public: |
575 | 2 | static Lz4HCBlockCompression* instance() { |
576 | 2 | static Lz4HCBlockCompression s_instance; |
577 | 2 | return &s_instance; |
578 | 2 | } |
579 | 1 | ~Lz4HCBlockCompression() { |
580 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
581 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
582 | 1 | _ctx_pool.clear(); |
583 | 1 | } |
584 | | |
585 | 6 | Status compress(const Slice& input, faststring* output) override { |
586 | 6 | std::unique_ptr<Context> context; |
587 | 6 | RETURN_IF_ERROR(_acquire_compression_ctx(context)); |
588 | 6 | bool compress_failed = false; |
589 | 6 | Defer defer {[&] { |
590 | 6 | if (!compress_failed) { |
591 | 6 | _release_compression_ctx(std::move(context)); |
592 | 6 | } |
593 | 6 | }}; |
594 | | |
595 | 6 | try { |
596 | 6 | Slice compressed_buf; |
597 | 6 | size_t max_len = max_compressed_len(input.size); |
598 | 6 | if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
599 | | // use output directly |
600 | 0 | output->resize(max_len); |
601 | 0 | compressed_buf.data = reinterpret_cast<char*>(output->data()); |
602 | 0 | compressed_buf.size = max_len; |
603 | 6 | } else { |
604 | 6 | { |
605 | 6 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
606 | 6 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
607 | | // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE |
608 | 6 | context->buffer->resize(max_len); |
609 | 6 | } |
610 | 6 | compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); |
611 | 6 | compressed_buf.size = max_len; |
612 | 6 | } |
613 | | |
614 | 6 | size_t compressed_len = LZ4_compress_HC_continue( |
615 | 6 | context->ctx, input.data, compressed_buf.data, input.size, compressed_buf.size); |
616 | 6 | if (compressed_len == 0) { |
617 | 0 | compress_failed = true; |
618 | 0 | return Status::InvalidArgument("Output buffer's capacity is not enough, size={}", |
619 | 0 | compressed_buf.size); |
620 | 0 | } |
621 | 6 | output->resize(compressed_len); |
622 | 6 | if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
623 | 6 | output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), |
624 | 6 | compressed_len); |
625 | 6 | } |
626 | 6 | } catch (...) { |
627 | | // Do not set compress_failed to release context |
628 | 0 | DCHECK(!compress_failed); |
629 | 0 | return Status::InternalError("Fail to do LZ4HC compress due to exception"); |
630 | 0 | } |
631 | 6 | return Status::OK(); |
632 | 6 | } |
633 | | |
634 | 11 | Status decompress(const Slice& input, Slice* output) override { |
635 | 11 | auto decompressed_len = |
636 | 11 | LZ4_decompress_safe(input.data, output->data, input.size, output->size); |
637 | 11 | if (decompressed_len < 0) { |
638 | 5 | return Status::InvalidArgument( |
639 | 5 | "destination buffer is not large enough or the source stream is detected " |
640 | 5 | "malformed, fail to do LZ4 decompress, error={}", |
641 | 5 | decompressed_len); |
642 | 5 | } |
643 | 6 | output->size = decompressed_len; |
644 | 6 | return Status::OK(); |
645 | 11 | } |
646 | | |
647 | 6 | size_t max_compressed_len(size_t len) override { return LZ4_compressBound(len); } |
648 | | |
649 | | private: |
650 | 6 | Status _acquire_compression_ctx(std::unique_ptr<Context>& out) { |
651 | 6 | std::lock_guard<std::mutex> l(_ctx_mutex); |
652 | 6 | if (_ctx_pool.empty()) { |
653 | 1 | std::unique_ptr<Context> localCtx = Context::create_unique(); |
654 | 1 | if (localCtx.get() == nullptr) { |
655 | 0 | return Status::InvalidArgument("new LZ4HC context error"); |
656 | 0 | } |
657 | 1 | localCtx->ctx = LZ4_createStreamHC(); |
658 | 1 | if (localCtx->ctx == nullptr) { |
659 | 0 | return Status::InvalidArgument("LZ4_createStreamHC error"); |
660 | 0 | } |
661 | 1 | out = std::move(localCtx); |
662 | 1 | return Status::OK(); |
663 | 1 | } |
664 | 5 | out = std::move(_ctx_pool.back()); |
665 | 5 | _ctx_pool.pop_back(); |
666 | 5 | return Status::OK(); |
667 | 6 | } |
668 | 6 | void _release_compression_ctx(std::unique_ptr<Context> context) { |
669 | 6 | DCHECK(context); |
670 | 6 | LZ4_resetStreamHC_fast(context->ctx, _compression_level); |
671 | 6 | std::lock_guard<std::mutex> l(_ctx_mutex); |
672 | 6 | _ctx_pool.push_back(std::move(context)); |
673 | 6 | } |
674 | | |
675 | | private: |
676 | | int64_t _compression_level = config::LZ4_HC_compression_level; |
677 | | mutable std::mutex _ctx_mutex; |
678 | | mutable std::vector<std::unique_ptr<Context>> _ctx_pool; |
679 | | }; |
680 | | |
681 | | class SnappySlicesSource : public snappy::Source { |
682 | | public: |
683 | | SnappySlicesSource(const std::vector<Slice>& slices) |
684 | 1 | : _available(0), _cur_slice(0), _slice_off(0) { |
685 | 5 | for (auto& slice : slices) { |
686 | | // We filter empty slice here to avoid complicated process |
687 | 5 | if (slice.size == 0) { |
688 | 1 | continue; |
689 | 1 | } |
690 | 4 | _available += slice.size; |
691 | 4 | _slices.push_back(slice); |
692 | 4 | } |
693 | 1 | } |
694 | 1 | ~SnappySlicesSource() override {} |
695 | | |
696 | | // Return the number of bytes left to read from the source |
697 | 1 | size_t Available() const override { return _available; } |
698 | | |
699 | | // Peek at the next flat region of the source. Does not reposition |
700 | | // the source. The returned region is empty iff Available()==0. |
701 | | // |
702 | | // Returns a pointer to the beginning of the region and store its |
703 | | // length in *len. |
704 | | // |
705 | | // The returned region is valid until the next call to Skip() or |
706 | | // until this object is destroyed, whichever occurs first. |
707 | | // |
708 | | // The returned region may be larger than Available() (for example |
709 | | // if this ByteSource is a view on a substring of a larger source). |
710 | | // The caller is responsible for ensuring that it only reads the |
711 | | // Available() bytes. |
712 | 19 | const char* Peek(size_t* len) override { |
713 | 19 | if (_available == 0) { |
714 | 0 | *len = 0; |
715 | 0 | return nullptr; |
716 | 0 | } |
717 | | // we should assure that *len is not 0 |
718 | 19 | *len = _slices[_cur_slice].size - _slice_off; |
719 | 19 | DCHECK(*len != 0); |
720 | 19 | return _slices[_cur_slice].data + _slice_off; |
721 | 19 | } |
722 | | |
723 | | // Skip the next n bytes. Invalidates any buffer returned by |
724 | | // a previous call to Peek(). |
725 | | // REQUIRES: Available() >= n |
726 | 20 | void Skip(size_t n) override { |
727 | 20 | _available -= n; |
728 | 24 | while (n > 0) { |
729 | 19 | auto left = _slices[_cur_slice].size - _slice_off; |
730 | 19 | if (left > n) { |
731 | | // n can be digest in current slice |
732 | 15 | _slice_off += n; |
733 | 15 | return; |
734 | 15 | } |
735 | 4 | _slice_off = 0; |
736 | 4 | _cur_slice++; |
737 | 4 | n -= left; |
738 | 4 | } |
739 | 20 | } |
740 | | |
741 | | private: |
742 | | std::vector<Slice> _slices; |
743 | | size_t _available; |
744 | | size_t _cur_slice; |
745 | | size_t _slice_off; |
746 | | }; |
747 | | |
748 | | class SnappyBlockCompression : public BlockCompressionCodec { |
749 | | public: |
750 | 86 | static SnappyBlockCompression* instance() { |
751 | 86 | static SnappyBlockCompression s_instance; |
752 | 86 | return &s_instance; |
753 | 86 | } |
754 | 1 | ~SnappyBlockCompression() override = default; |
755 | | |
756 | 34 | Status compress(const Slice& input, faststring* output) override { |
757 | 34 | size_t max_len = max_compressed_len(input.size); |
758 | 34 | output->resize(max_len); |
759 | 34 | Slice s(*output); |
760 | | |
761 | 34 | snappy::RawCompress(input.data, input.size, s.data, &s.size); |
762 | 34 | output->resize(s.size); |
763 | 34 | return Status::OK(); |
764 | 34 | } |
765 | | |
766 | 76 | Status decompress(const Slice& input, Slice* output) override { |
767 | 76 | if (!snappy::RawUncompress(input.data, input.size, output->data)) { |
768 | 0 | return Status::InvalidArgument("Fail to do Snappy decompress"); |
769 | 0 | } |
770 | | // NOTE: GetUncompressedLength only takes O(1) time |
771 | 76 | snappy::GetUncompressedLength(input.data, input.size, &output->size); |
772 | 76 | return Status::OK(); |
773 | 76 | } |
774 | | |
775 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
776 | 1 | faststring* output) override { |
777 | 1 | auto max_len = max_compressed_len(uncompressed_size); |
778 | 1 | output->resize(max_len); |
779 | | |
780 | 1 | SnappySlicesSource source(inputs); |
781 | 1 | snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(output->data())); |
782 | 1 | output->resize(snappy::Compress(&source, &sink)); |
783 | 1 | return Status::OK(); |
784 | 1 | } |
785 | | |
786 | 35 | size_t max_compressed_len(size_t len) override { return snappy::MaxCompressedLength(len); } |
787 | | }; |
788 | | |
789 | | class HadoopSnappyBlockCompression : public SnappyBlockCompression { |
790 | | public: |
791 | 0 | static HadoopSnappyBlockCompression* instance() { |
792 | 0 | static HadoopSnappyBlockCompression s_instance; |
793 | 0 | return &s_instance; |
794 | 0 | } |
795 | 0 | ~HadoopSnappyBlockCompression() override = default; |
796 | | |
797 | | // hadoop use block compression for snappy |
798 | | // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc |
799 | 0 | Status compress(const Slice& input, faststring* output) override { |
800 | | // be same with hadop https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java |
801 | 0 | size_t snappy_block_size = config::snappy_compression_block_size; |
802 | 0 | size_t overhead = snappy_block_size / 6 + 32; |
803 | 0 | size_t max_input_size = snappy_block_size - overhead; |
804 | |
|
805 | 0 | size_t data_len = input.size; |
806 | 0 | char* data = input.data; |
807 | 0 | std::vector<OwnedSlice> buffers; |
808 | 0 | size_t out_len = 0; |
809 | |
|
810 | 0 | while (data_len > 0) { |
811 | 0 | size_t input_size = std::min(data_len, max_input_size); |
812 | 0 | Slice input_slice(data, input_size); |
813 | 0 | faststring output_data; |
814 | 0 | RETURN_IF_ERROR(SnappyBlockCompression::compress(input_slice, &output_data)); |
815 | 0 | out_len += output_data.size(); |
816 | | // the OwnedSlice will be moved here |
817 | 0 | buffers.push_back(output_data.build()); |
818 | 0 | data += input_size; |
819 | 0 | data_len -= input_size; |
820 | 0 | } |
821 | | |
822 | | // hadoop block compression: umcompressed_length | compressed_length1 | compressed_data1 | compressed_length2 | compressed_data2 | ... |
823 | 0 | size_t total_output_len = 4 + 4 * buffers.size() + out_len; |
824 | 0 | output->resize(total_output_len); |
825 | 0 | char* output_buffer = (char*)output->data(); |
826 | 0 | BigEndian::Store32(output_buffer, input.get_size()); |
827 | 0 | output_buffer += 4; |
828 | 0 | for (const auto& buffer : buffers) { |
829 | 0 | auto slice = buffer.slice(); |
830 | 0 | BigEndian::Store32(output_buffer, slice.get_size()); |
831 | 0 | output_buffer += 4; |
832 | 0 | memcpy(output_buffer, slice.get_data(), slice.get_size()); |
833 | 0 | output_buffer += slice.get_size(); |
834 | 0 | } |
835 | |
|
836 | 0 | DCHECK_EQ(output_buffer - (char*)output->data(), total_output_len); |
837 | |
|
838 | 0 | return Status::OK(); |
839 | 0 | } |
840 | | |
841 | 0 | Status decompress(const Slice& input, Slice* output) override { |
842 | 0 | return Status::InternalError("unimplement: SnappyHadoopBlockCompression::decompress"); |
843 | 0 | } |
844 | | }; |
845 | | |
846 | | class ZlibBlockCompression : public BlockCompressionCodec { |
847 | | public: |
848 | 2 | static ZlibBlockCompression* instance() { |
849 | 2 | static ZlibBlockCompression s_instance; |
850 | 2 | return &s_instance; |
851 | 2 | } |
852 | 1 | ~ZlibBlockCompression() override = default; |
853 | | |
854 | 5 | Status compress(const Slice& input, faststring* output) override { |
855 | 5 | size_t max_len = max_compressed_len(input.size); |
856 | 5 | output->resize(max_len); |
857 | 5 | Slice s(*output); |
858 | | |
859 | 5 | auto zres = ::compress((Bytef*)s.data, &s.size, (Bytef*)input.data, input.size); |
860 | 5 | if (zres == Z_MEM_ERROR) { |
861 | 0 | throw Exception(Status::MemoryLimitExceeded(fmt::format( |
862 | 0 | "ZLib compression failed due to memory allocationerror.error = {}, res = {} ", |
863 | 0 | zError(zres), zres))); |
864 | 5 | } else if (zres != Z_OK) { |
865 | 0 | return Status::InternalError("Fail to do Zlib compress, error={}", zError(zres)); |
866 | 0 | } |
867 | 5 | output->resize(s.size); |
868 | 5 | return Status::OK(); |
869 | 5 | } |
870 | | |
871 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
872 | 1 | faststring* output) override { |
873 | 1 | size_t max_len = max_compressed_len(uncompressed_size); |
874 | 1 | output->resize(max_len); |
875 | | |
876 | 1 | z_stream zstrm; |
877 | 1 | zstrm.zalloc = Z_NULL; |
878 | 1 | zstrm.zfree = Z_NULL; |
879 | 1 | zstrm.opaque = Z_NULL; |
880 | 1 | auto zres = deflateInit(&zstrm, Z_DEFAULT_COMPRESSION); |
881 | 1 | if (zres == Z_MEM_ERROR) { |
882 | 0 | throw Exception(Status::MemoryLimitExceeded( |
883 | 0 | "Fail to do ZLib stream compress, error={}, res={}", zError(zres), zres)); |
884 | 1 | } else if (zres != Z_OK) { |
885 | 0 | return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
886 | 0 | zError(zres), zres); |
887 | 0 | } |
888 | | // we assume that output is e |
889 | 1 | zstrm.next_out = (Bytef*)output->data(); |
890 | 1 | zstrm.avail_out = output->size(); |
891 | 6 | for (int i = 0; i < inputs.size(); ++i) { |
892 | 5 | if (inputs[i].size == 0) { |
893 | 1 | continue; |
894 | 1 | } |
895 | 4 | zstrm.next_in = (Bytef*)inputs[i].data; |
896 | 4 | zstrm.avail_in = inputs[i].size; |
897 | 4 | int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH; |
898 | | |
899 | 4 | zres = deflate(&zstrm, flush); |
900 | 4 | if (zres != Z_OK && zres != Z_STREAM_END) { |
901 | 0 | return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
902 | 0 | zError(zres), zres); |
903 | 0 | } |
904 | 4 | } |
905 | | |
906 | 1 | output->resize(zstrm.total_out); |
907 | 1 | zres = deflateEnd(&zstrm); |
908 | 1 | if (zres == Z_DATA_ERROR) { |
909 | 0 | return Status::InvalidArgument("Fail to do deflateEnd, error={}, res={}", zError(zres), |
910 | 0 | zres); |
911 | 1 | } else if (zres != Z_OK) { |
912 | 0 | return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}", |
913 | 0 | zError(zres), zres); |
914 | 0 | } |
915 | 1 | return Status::OK(); |
916 | 1 | } |
917 | | |
918 | 14 | Status decompress(const Slice& input, Slice* output) override { |
919 | 14 | size_t input_size = input.size; |
920 | 14 | auto zres = |
921 | 14 | ::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size); |
922 | 14 | if (zres == Z_DATA_ERROR) { |
923 | 1 | return Status::InvalidArgument("Fail to do ZLib decompress, error={}", zError(zres)); |
924 | 13 | } else if (zres == Z_MEM_ERROR) { |
925 | 0 | throw Exception(Status::MemoryLimitExceeded("Fail to do ZLib decompress, error={}", |
926 | 0 | zError(zres))); |
927 | 13 | } else if (zres != Z_OK) { |
928 | 7 | return Status::InternalError("Fail to do ZLib decompress, error={}", zError(zres)); |
929 | 7 | } |
930 | 6 | return Status::OK(); |
931 | 14 | } |
932 | | |
933 | 6 | size_t max_compressed_len(size_t len) override { |
934 | | // one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block |
935 | 6 | return len + 6 + 5 * ((len >> 14) + 1); |
936 | 6 | } |
937 | | }; |
938 | | |
939 | | class Bzip2BlockCompression : public BlockCompressionCodec { |
940 | | public: |
941 | 0 | static Bzip2BlockCompression* instance() { |
942 | 0 | static Bzip2BlockCompression s_instance; |
943 | 0 | return &s_instance; |
944 | 0 | } |
945 | 0 | ~Bzip2BlockCompression() override = default; |
946 | | |
947 | 0 | Status compress(const Slice& input, faststring* output) override { |
948 | 0 | size_t max_len = max_compressed_len(input.size); |
949 | 0 | output->resize(max_len); |
950 | 0 | uint32_t size = output->size(); |
951 | 0 | auto bzres = BZ2_bzBuffToBuffCompress((char*)output->data(), &size, (char*)input.data, |
952 | 0 | input.size, 9, 0, 0); |
953 | 0 | if (bzres == BZ_MEM_ERROR) { |
954 | 0 | throw Exception( |
955 | 0 | Status::MemoryLimitExceeded("Fail to do Bzip2 compress, ret={}", bzres)); |
956 | 0 | } else if (bzres == BZ_PARAM_ERROR) { |
957 | 0 | return Status::InvalidArgument("Fail to do Bzip2 compress, ret={}", bzres); |
958 | 0 | } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK && |
959 | 0 | bzres != BZ_STREAM_END && bzres != BZ_OK) { |
960 | 0 | return Status::InternalError("Failed to init bz2. status code: {}", bzres); |
961 | 0 | } |
962 | 0 | output->resize(size); |
963 | 0 | return Status::OK(); |
964 | 0 | } |
965 | | |
966 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
967 | 0 | faststring* output) override { |
968 | 0 | size_t max_len = max_compressed_len(uncompressed_size); |
969 | 0 | output->resize(max_len); |
970 | |
|
971 | 0 | bz_stream bzstrm; |
972 | 0 | bzero(&bzstrm, sizeof(bzstrm)); |
973 | 0 | int bzres = BZ2_bzCompressInit(&bzstrm, 9, 0, 0); |
974 | 0 | if (bzres == BZ_PARAM_ERROR) { |
975 | 0 | return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres); |
976 | 0 | } else if (bzres == BZ_MEM_ERROR) { |
977 | 0 | throw Exception( |
978 | 0 | Status::MemoryLimitExceeded("Failed to init bz2. status code: {}", bzres)); |
979 | 0 | } else if (bzres != BZ_OK) { |
980 | 0 | return Status::InternalError("Failed to init bz2. status code: {}", bzres); |
981 | 0 | } |
982 | | // we assume that output is e |
983 | 0 | bzstrm.next_out = (char*)output->data(); |
984 | 0 | bzstrm.avail_out = output->size(); |
985 | 0 | for (int i = 0; i < inputs.size(); ++i) { |
986 | 0 | if (inputs[i].size == 0) { |
987 | 0 | continue; |
988 | 0 | } |
989 | 0 | bzstrm.next_in = (char*)inputs[i].data; |
990 | 0 | bzstrm.avail_in = inputs[i].size; |
991 | 0 | int flush = (i == (inputs.size() - 1)) ? BZ_FINISH : BZ_RUN; |
992 | |
|
993 | 0 | bzres = BZ2_bzCompress(&bzstrm, flush); |
994 | 0 | if (bzres == BZ_PARAM_ERROR) { |
995 | 0 | return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres); |
996 | 0 | } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK && |
997 | 0 | bzres != BZ_STREAM_END && bzres != BZ_OK) { |
998 | 0 | return Status::InternalError("Failed to init bz2. status code: {}", bzres); |
999 | 0 | } |
1000 | 0 | } |
1001 | | |
1002 | 0 | size_t total_out = (size_t)bzstrm.total_out_hi32 << 32 | (size_t)bzstrm.total_out_lo32; |
1003 | 0 | output->resize(total_out); |
1004 | 0 | bzres = BZ2_bzCompressEnd(&bzstrm); |
1005 | 0 | if (bzres == BZ_PARAM_ERROR) { |
1006 | 0 | return Status::InvalidArgument("Fail to do deflateEnd on bzip2 stream, res={}", bzres); |
1007 | 0 | } else if (bzres != BZ_OK) { |
1008 | 0 | return Status::InternalError("Fail to do deflateEnd on bzip2 stream, res={}", bzres); |
1009 | 0 | } |
1010 | 0 | return Status::OK(); |
1011 | 0 | } |
1012 | | |
1013 | 0 | Status decompress(const Slice& input, Slice* output) override { |
1014 | 0 | return Status::InternalError("unimplement: Bzip2BlockCompression::decompress"); |
1015 | 0 | } |
1016 | | |
1017 | 0 | size_t max_compressed_len(size_t len) override { |
1018 | | // TODO: make sure the max_compressed_len for bzip2 |
1019 | 0 | return len * 2; |
1020 | 0 | } |
1021 | | }; |
1022 | | |
1023 | | // for ZSTD compression and decompression, with BOTH fast and high compression ratio |
1024 | | class ZstdBlockCompression : public BlockCompressionCodec { |
1025 | | private: |
1026 | | class CContext { |
1027 | | ENABLE_FACTORY_CREATOR(CContext); |
1028 | | |
1029 | | public: |
1030 | 1 | CContext() : ctx(nullptr) { |
1031 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
1032 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
1033 | 1 | buffer = std::make_unique<faststring>(); |
1034 | 1 | } |
1035 | | ZSTD_CCtx* ctx; |
1036 | | std::unique_ptr<faststring> buffer; |
1037 | 1 | ~CContext() { |
1038 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
1039 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
1040 | 1 | if (ctx) { |
1041 | 1 | ZSTD_freeCCtx(ctx); |
1042 | 1 | } |
1043 | 1 | buffer.reset(); |
1044 | 1 | } |
1045 | | }; |
1046 | | class DContext { |
1047 | | ENABLE_FACTORY_CREATOR(DContext); |
1048 | | |
1049 | | public: |
1050 | 6 | DContext() : ctx(nullptr) {} |
1051 | | ZSTD_DCtx* ctx; |
1052 | 6 | ~DContext() { |
1053 | 6 | if (ctx) { |
1054 | 6 | ZSTD_freeDCtx(ctx); |
1055 | 6 | } |
1056 | 6 | } |
1057 | | }; |
1058 | | |
1059 | | public: |
1060 | 359 | static ZstdBlockCompression* instance() { |
1061 | 359 | static ZstdBlockCompression s_instance; |
1062 | 359 | return &s_instance; |
1063 | 359 | } |
1064 | 1 | ~ZstdBlockCompression() { |
1065 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
1066 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
1067 | 1 | _ctx_c_pool.clear(); |
1068 | 1 | _ctx_d_pool.clear(); |
1069 | 1 | } |
1070 | | |
1071 | 280 | size_t max_compressed_len(size_t len) override { return ZSTD_compressBound(len); } |
1072 | | |
1073 | 130 | Status compress(const Slice& input, faststring* output) override { |
1074 | 130 | std::vector<Slice> inputs {input}; |
1075 | 130 | return compress(inputs, input.size, output); |
1076 | 130 | } |
1077 | | |
1078 | | // follow ZSTD official example |
1079 | | // https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c |
1080 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
1081 | 280 | faststring* output) override { |
1082 | 280 | std::unique_ptr<CContext> context; |
1083 | 280 | RETURN_IF_ERROR(_acquire_compression_ctx(context)); |
1084 | 280 | bool compress_failed = false; |
1085 | 280 | Defer defer {[&] { |
1086 | 280 | if (!compress_failed) { |
1087 | 280 | _release_compression_ctx(std::move(context)); |
1088 | 280 | } |
1089 | 280 | }}; |
1090 | | |
1091 | 280 | try { |
1092 | 280 | size_t max_len = max_compressed_len(uncompressed_size); |
1093 | 280 | Slice compressed_buf; |
1094 | 280 | if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
1095 | | // use output directly |
1096 | 4 | output->resize(max_len); |
1097 | 4 | compressed_buf.data = reinterpret_cast<char*>(output->data()); |
1098 | 4 | compressed_buf.size = max_len; |
1099 | 276 | } else { |
1100 | 276 | { |
1101 | 276 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
1102 | 276 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
1103 | | // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE |
1104 | 276 | context->buffer->resize(max_len); |
1105 | 276 | } |
1106 | 276 | compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); |
1107 | 276 | compressed_buf.size = max_len; |
1108 | 276 | } |
1109 | | |
1110 | | // set compression level to default 3 |
1111 | 280 | auto ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_compressionLevel, |
1112 | 280 | ZSTD_CLEVEL_DEFAULT); |
1113 | 280 | if (ZSTD_isError(ret)) { |
1114 | 0 | return Status::InvalidArgument("ZSTD_CCtx_setParameter compression level error: {}", |
1115 | 0 | ZSTD_getErrorString(ZSTD_getErrorCode(ret))); |
1116 | 0 | } |
1117 | | // set checksum flag to 1 |
1118 | 280 | ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1); |
1119 | 280 | if (ZSTD_isError(ret)) { |
1120 | 0 | return Status::InvalidArgument("ZSTD_CCtx_setParameter checksumFlag error: {}", |
1121 | 0 | ZSTD_getErrorString(ZSTD_getErrorCode(ret))); |
1122 | 0 | } |
1123 | | |
1124 | 280 | ZSTD_outBuffer out_buf = {compressed_buf.data, compressed_buf.size, 0}; |
1125 | | |
1126 | 564 | for (size_t i = 0; i < inputs.size(); i++) { |
1127 | 284 | ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0}; |
1128 | | |
1129 | 284 | bool last_input = (i == inputs.size() - 1); |
1130 | 284 | auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue; |
1131 | | |
1132 | 284 | bool finished = false; |
1133 | 284 | do { |
1134 | | // do compress |
1135 | 284 | auto ret = ZSTD_compressStream2(context->ctx, &out_buf, &in_buf, mode); |
1136 | | |
1137 | 284 | if (ZSTD_isError(ret)) { |
1138 | 0 | compress_failed = true; |
1139 | 0 | return Status::InternalError("ZSTD_compressStream2 error: {}", |
1140 | 0 | ZSTD_getErrorString(ZSTD_getErrorCode(ret))); |
1141 | 0 | } |
1142 | | |
1143 | | // ret is ZSTD hint for needed output buffer size |
1144 | 284 | if (ret > 0 && out_buf.pos == out_buf.size) { |
1145 | 0 | compress_failed = true; |
1146 | 0 | return Status::InternalError("ZSTD_compressStream2 output buffer full"); |
1147 | 0 | } |
1148 | | |
1149 | 284 | finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size); |
1150 | 284 | } while (!finished); |
1151 | 284 | } |
1152 | | |
1153 | | // set compressed size for caller |
1154 | 280 | output->resize(out_buf.pos); |
1155 | 280 | if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
1156 | 276 | output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), out_buf.pos); |
1157 | 276 | } |
1158 | 280 | } catch (std::exception& e) { |
1159 | 0 | return Status::InternalError("Fail to do ZSTD compress due to exception {}", e.what()); |
1160 | 0 | } catch (...) { |
1161 | | // Do not set compress_failed to release context |
1162 | 0 | DCHECK(!compress_failed); |
1163 | 0 | return Status::InternalError("Fail to do ZSTD compress due to exception"); |
1164 | 0 | } |
1165 | | |
1166 | 280 | return Status::OK(); |
1167 | 280 | } |
1168 | | |
1169 | 183 | Status decompress(const Slice& input, Slice* output) override { |
1170 | 183 | std::unique_ptr<DContext> context; |
1171 | 183 | bool decompress_failed = false; |
1172 | 183 | RETURN_IF_ERROR(_acquire_decompression_ctx(context)); |
1173 | 183 | Defer defer {[&] { |
1174 | 183 | if (!decompress_failed) { |
1175 | 178 | _release_decompression_ctx(std::move(context)); |
1176 | 178 | } |
1177 | 183 | }}; |
1178 | | |
1179 | 183 | size_t ret = ZSTD_decompressDCtx(context->ctx, output->data, output->size, input.data, |
1180 | 183 | input.size); |
1181 | 183 | if (ZSTD_isError(ret)) { |
1182 | 5 | decompress_failed = true; |
1183 | 5 | return Status::InternalError("ZSTD_decompressDCtx error: {}", |
1184 | 5 | ZSTD_getErrorString(ZSTD_getErrorCode(ret))); |
1185 | 5 | } |
1186 | | |
1187 | | // set decompressed size for caller |
1188 | 178 | output->size = ret; |
1189 | | |
1190 | 178 | return Status::OK(); |
1191 | 183 | } |
1192 | | |
1193 | | private: |
1194 | 280 | Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) { |
1195 | 280 | std::lock_guard<std::mutex> l(_ctx_c_mutex); |
1196 | 280 | if (_ctx_c_pool.empty()) { |
1197 | 1 | std::unique_ptr<CContext> localCtx = CContext::create_unique(); |
1198 | 1 | if (localCtx.get() == nullptr) { |
1199 | 0 | return Status::InvalidArgument("failed to new ZSTD CContext"); |
1200 | 0 | } |
1201 | | //typedef LZ4F_cctx* LZ4F_compressionContext_t; |
1202 | 1 | localCtx->ctx = ZSTD_createCCtx(); |
1203 | 1 | if (localCtx->ctx == nullptr) { |
1204 | 0 | return Status::InvalidArgument("Failed to create ZSTD compress ctx"); |
1205 | 0 | } |
1206 | 1 | out = std::move(localCtx); |
1207 | 1 | return Status::OK(); |
1208 | 1 | } |
1209 | 279 | out = std::move(_ctx_c_pool.back()); |
1210 | 279 | _ctx_c_pool.pop_back(); |
1211 | 279 | return Status::OK(); |
1212 | 280 | } |
1213 | 280 | void _release_compression_ctx(std::unique_ptr<CContext> context) { |
1214 | 280 | DCHECK(context); |
1215 | 280 | auto ret = ZSTD_CCtx_reset(context->ctx, ZSTD_reset_session_only); |
1216 | 280 | DCHECK(!ZSTD_isError(ret)); |
1217 | 280 | std::lock_guard<std::mutex> l(_ctx_c_mutex); |
1218 | 280 | _ctx_c_pool.push_back(std::move(context)); |
1219 | 280 | } |
1220 | | |
1221 | 183 | Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) { |
1222 | 183 | std::lock_guard<std::mutex> l(_ctx_d_mutex); |
1223 | 183 | if (_ctx_d_pool.empty()) { |
1224 | 6 | std::unique_ptr<DContext> localCtx = DContext::create_unique(); |
1225 | 6 | if (localCtx.get() == nullptr) { |
1226 | 0 | return Status::InvalidArgument("failed to new ZSTD DContext"); |
1227 | 0 | } |
1228 | 6 | localCtx->ctx = ZSTD_createDCtx(); |
1229 | 6 | if (localCtx->ctx == nullptr) { |
1230 | 0 | return Status::InvalidArgument("Fail to init ZSTD decompress context"); |
1231 | 0 | } |
1232 | 6 | out = std::move(localCtx); |
1233 | 6 | return Status::OK(); |
1234 | 6 | } |
1235 | 177 | out = std::move(_ctx_d_pool.back()); |
1236 | 177 | _ctx_d_pool.pop_back(); |
1237 | 177 | return Status::OK(); |
1238 | 183 | } |
1239 | 178 | void _release_decompression_ctx(std::unique_ptr<DContext> context) { |
1240 | 178 | DCHECK(context); |
1241 | | // reset ctx to start a new decompress session |
1242 | 178 | auto ret = ZSTD_DCtx_reset(context->ctx, ZSTD_reset_session_only); |
1243 | 178 | DCHECK(!ZSTD_isError(ret)); |
1244 | 178 | std::lock_guard<std::mutex> l(_ctx_d_mutex); |
1245 | 178 | _ctx_d_pool.push_back(std::move(context)); |
1246 | 178 | } |
1247 | | |
1248 | | private: |
1249 | | mutable std::mutex _ctx_c_mutex; |
1250 | | mutable std::vector<std::unique_ptr<CContext>> _ctx_c_pool; |
1251 | | |
1252 | | mutable std::mutex _ctx_d_mutex; |
1253 | | mutable std::vector<std::unique_ptr<DContext>> _ctx_d_pool; |
1254 | | }; |
1255 | | |
1256 | | class GzipBlockCompression : public ZlibBlockCompression { |
1257 | | public: |
1258 | 0 | static GzipBlockCompression* instance() { |
1259 | 0 | static GzipBlockCompression s_instance; |
1260 | 0 | return &s_instance; |
1261 | 0 | } |
1262 | 0 | ~GzipBlockCompression() override = default; |
1263 | | |
1264 | 0 | Status compress(const Slice& input, faststring* output) override { |
1265 | 0 | size_t max_len = max_compressed_len(input.size); |
1266 | 0 | output->resize(max_len); |
1267 | |
|
1268 | 0 | z_stream z_strm = {}; |
1269 | 0 | z_strm.zalloc = Z_NULL; |
1270 | 0 | z_strm.zfree = Z_NULL; |
1271 | 0 | z_strm.opaque = Z_NULL; |
1272 | |
|
1273 | 0 | int zres = deflateInit2(&z_strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, |
1274 | 0 | 8, Z_DEFAULT_STRATEGY); |
1275 | |
|
1276 | 0 | if (zres == Z_MEM_ERROR) { |
1277 | 0 | throw Exception(Status::MemoryLimitExceeded( |
1278 | 0 | "Fail to init ZLib compress, error={}, res={}", zError(zres), zres)); |
1279 | 0 | } else if (zres != Z_OK) { |
1280 | 0 | return Status::InternalError("Fail to init ZLib compress, error={}, res={}", |
1281 | 0 | zError(zres), zres); |
1282 | 0 | } |
1283 | | |
1284 | 0 | z_strm.next_in = (Bytef*)input.get_data(); |
1285 | 0 | z_strm.avail_in = input.get_size(); |
1286 | 0 | z_strm.next_out = (Bytef*)output->data(); |
1287 | 0 | z_strm.avail_out = output->size(); |
1288 | |
|
1289 | 0 | zres = deflate(&z_strm, Z_FINISH); |
1290 | 0 | if (zres != Z_OK && zres != Z_STREAM_END) { |
1291 | 0 | return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
1292 | 0 | zError(zres), zres); |
1293 | 0 | } |
1294 | | |
1295 | 0 | output->resize(z_strm.total_out); |
1296 | 0 | zres = deflateEnd(&z_strm); |
1297 | 0 | if (zres == Z_DATA_ERROR) { |
1298 | 0 | return Status::InvalidArgument("Fail to end zlib compress"); |
1299 | 0 | } else if (zres != Z_OK) { |
1300 | 0 | return Status::InternalError("Fail to end zlib compress"); |
1301 | 0 | } |
1302 | 0 | return Status::OK(); |
1303 | 0 | } |
1304 | | |
1305 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
1306 | 0 | faststring* output) override { |
1307 | 0 | size_t max_len = max_compressed_len(uncompressed_size); |
1308 | 0 | output->resize(max_len); |
1309 | |
|
1310 | 0 | z_stream zstrm; |
1311 | 0 | zstrm.zalloc = Z_NULL; |
1312 | 0 | zstrm.zfree = Z_NULL; |
1313 | 0 | zstrm.opaque = Z_NULL; |
1314 | 0 | auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, |
1315 | 0 | 8, Z_DEFAULT_STRATEGY); |
1316 | 0 | if (zres == Z_MEM_ERROR) { |
1317 | 0 | throw Exception(Status::MemoryLimitExceeded( |
1318 | 0 | "Fail to init ZLib stream compress, error={}, res={}", zError(zres), zres)); |
1319 | 0 | } else if (zres != Z_OK) { |
1320 | 0 | return Status::InternalError("Fail to init ZLib stream compress, error={}, res={}", |
1321 | 0 | zError(zres), zres); |
1322 | 0 | } |
1323 | | |
1324 | | // we assume that output is e |
1325 | 0 | zstrm.next_out = (Bytef*)output->data(); |
1326 | 0 | zstrm.avail_out = output->size(); |
1327 | 0 | for (int i = 0; i < inputs.size(); ++i) { |
1328 | 0 | if (inputs[i].size == 0) { |
1329 | 0 | continue; |
1330 | 0 | } |
1331 | 0 | zstrm.next_in = (Bytef*)inputs[i].data; |
1332 | 0 | zstrm.avail_in = inputs[i].size; |
1333 | 0 | int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH; |
1334 | |
|
1335 | 0 | zres = deflate(&zstrm, flush); |
1336 | 0 | if (zres != Z_OK && zres != Z_STREAM_END) { |
1337 | 0 | return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
1338 | 0 | zError(zres), zres); |
1339 | 0 | } |
1340 | 0 | } |
1341 | | |
1342 | 0 | output->resize(zstrm.total_out); |
1343 | 0 | zres = deflateEnd(&zstrm); |
1344 | 0 | if (zres == Z_DATA_ERROR) { |
1345 | 0 | return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}", |
1346 | 0 | zError(zres), zres); |
1347 | 0 | } else if (zres != Z_OK) { |
1348 | 0 | return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}", |
1349 | 0 | zError(zres), zres); |
1350 | 0 | } |
1351 | 0 | return Status::OK(); |
1352 | 0 | } |
1353 | | |
1354 | 0 | Status decompress(const Slice& input, Slice* output) override { |
1355 | 0 | z_stream z_strm = {}; |
1356 | 0 | z_strm.zalloc = Z_NULL; |
1357 | 0 | z_strm.zfree = Z_NULL; |
1358 | 0 | z_strm.opaque = Z_NULL; |
1359 | |
|
1360 | 0 | int ret = inflateInit2(&z_strm, MAX_WBITS + GZIP_CODEC); |
1361 | 0 | if (ret != Z_OK) { |
1362 | 0 | return Status::InternalError("Fail to init ZLib decompress, error={}, res={}", |
1363 | 0 | zError(ret), ret); |
1364 | 0 | } |
1365 | | |
1366 | | // 1. set input and output |
1367 | 0 | z_strm.next_in = reinterpret_cast<Bytef*>(input.data); |
1368 | 0 | z_strm.avail_in = input.size; |
1369 | 0 | z_strm.next_out = reinterpret_cast<Bytef*>(output->data); |
1370 | 0 | z_strm.avail_out = output->size; |
1371 | |
|
1372 | 0 | if (z_strm.avail_out > 0) { |
1373 | | // We only support non-streaming use case for block decompressor |
1374 | 0 | ret = inflate(&z_strm, Z_FINISH); |
1375 | 0 | if (ret != Z_OK && ret != Z_STREAM_END) { |
1376 | 0 | (void)inflateEnd(&z_strm); |
1377 | 0 | if (ret == Z_MEM_ERROR) { |
1378 | 0 | throw Exception(Status::MemoryLimitExceeded( |
1379 | 0 | "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret)); |
1380 | 0 | } else if (ret == Z_DATA_ERROR) { |
1381 | 0 | return Status::InvalidArgument( |
1382 | 0 | "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret); |
1383 | 0 | } |
1384 | 0 | return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
1385 | 0 | zError(ret), ret); |
1386 | 0 | } |
1387 | 0 | } |
1388 | 0 | (void)inflateEnd(&z_strm); |
1389 | |
|
1390 | 0 | return Status::OK(); |
1391 | 0 | } |
1392 | | |
1393 | 0 | size_t max_compressed_len(size_t len) override { |
1394 | 0 | z_stream zstrm; |
1395 | 0 | zstrm.zalloc = Z_NULL; |
1396 | 0 | zstrm.zfree = Z_NULL; |
1397 | 0 | zstrm.opaque = Z_NULL; |
1398 | 0 | auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, |
1399 | 0 | MEM_LEVEL, Z_DEFAULT_STRATEGY); |
1400 | 0 | if (zres != Z_OK) { |
1401 | | // Fall back to zlib estimate logic for deflate, notice this may |
1402 | | // cause decompress error |
1403 | 0 | LOG(WARNING) << "Fail to do ZLib stream compress, error=" << zError(zres) |
1404 | 0 | << ", res=" << zres; |
1405 | 0 | return ZlibBlockCompression::max_compressed_len(len); |
1406 | 0 | } else { |
1407 | 0 | zres = deflateEnd(&zstrm); |
1408 | 0 | if (zres != Z_OK) { |
1409 | 0 | LOG(WARNING) << "Fail to do deflateEnd on ZLib stream, error=" << zError(zres) |
1410 | 0 | << ", res=" << zres; |
1411 | 0 | } |
1412 | | // Mark, maintainer of zlib, has stated that 12 needs to be added to |
1413 | | // result for gzip |
1414 | | // http://compgroups.net/comp.unix.programmer/gzip-compressing-an-in-memory-string-usi/54854 |
1415 | | // To have a safe upper bound for "wrapper variations", we add 32 to |
1416 | | // estimate |
1417 | 0 | int upper_bound = deflateBound(&zstrm, len) + 32; |
1418 | 0 | return upper_bound; |
1419 | 0 | } |
1420 | 0 | } |
1421 | | |
1422 | | private: |
1423 | | // Magic number for zlib, see https://zlib.net/manual.html for more details. |
1424 | | const static int GZIP_CODEC = 16; // gzip |
1425 | | // The memLevel parameter specifies how much memory should be allocated for |
1426 | | // the internal compression state. |
1427 | | const static int MEM_LEVEL = 8; |
1428 | | }; |
1429 | | |
1430 | | // Only used on x86 or x86_64 |
1431 | | #if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \ |
1432 | | defined(__i386) || defined(_M_IX86) |
1433 | | class GzipBlockCompressionByLibdeflate final : public GzipBlockCompression { |
1434 | | public: |
1435 | 0 | GzipBlockCompressionByLibdeflate() : GzipBlockCompression() {} |
1436 | 0 | static GzipBlockCompressionByLibdeflate* instance() { |
1437 | 0 | static GzipBlockCompressionByLibdeflate s_instance; |
1438 | 0 | return &s_instance; |
1439 | 0 | } |
1440 | 0 | ~GzipBlockCompressionByLibdeflate() override = default; |
1441 | | |
1442 | 0 | Status decompress(const Slice& input, Slice* output) override { |
1443 | 0 | if (input.empty()) { |
1444 | 0 | output->size = 0; |
1445 | 0 | return Status::OK(); |
1446 | 0 | } |
1447 | 0 | thread_local std::unique_ptr<libdeflate_decompressor, void (*)(libdeflate_decompressor*)> |
1448 | 0 | decompressor {libdeflate_alloc_decompressor(), libdeflate_free_decompressor}; |
1449 | 0 | if (!decompressor) { |
1450 | 0 | return Status::InternalError("libdeflate_alloc_decompressor error."); |
1451 | 0 | } |
1452 | 0 | std::size_t out_len; |
1453 | 0 | auto result = libdeflate_gzip_decompress(decompressor.get(), input.data, input.size, |
1454 | 0 | output->data, output->size, &out_len); |
1455 | 0 | if (result != LIBDEFLATE_SUCCESS) { |
1456 | 0 | return Status::InternalError("libdeflate_gzip_decompress error, res={}", result); |
1457 | 0 | } |
1458 | 0 | return Status::OK(); |
1459 | 0 | } |
1460 | | }; |
1461 | | #endif |
1462 | | |
1463 | | class LzoBlockCompression final : public BlockCompressionCodec { |
1464 | | public: |
1465 | 0 | static LzoBlockCompression* instance() { |
1466 | 0 | static LzoBlockCompression s_instance; |
1467 | 0 | return &s_instance; |
1468 | 0 | } |
1469 | | |
1470 | 0 | Status compress(const Slice& input, faststring* output) override { |
1471 | 0 | return Status::InvalidArgument("not impl lzo compress."); |
1472 | 0 | } |
1473 | 0 | size_t max_compressed_len(size_t len) override { return 0; }; |
1474 | 0 | Status decompress(const Slice& input, Slice* output) override { |
1475 | 0 | auto* input_ptr = input.data; |
1476 | 0 | auto remain_input_size = input.size; |
1477 | 0 | auto* output_ptr = output->data; |
1478 | 0 | auto remain_output_size = output->size; |
1479 | 0 | auto* output_limit = output->data + output->size; |
1480 | | |
1481 | | // Example: |
1482 | | // OriginData(The original data will be divided into several large data block.) : |
1483 | | // large data block1 | large data block2 | large data block3 | .... |
1484 | | // The large data block will be divided into several small data block. |
1485 | | // Suppose a large data block is divided into three small blocks: |
1486 | | // large data block1: | small block1 | small block2 | small block3 | |
1487 | | // CompressData: <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]> |
1488 | | // |
1489 | | // A : original length of the current block of large data block. |
1490 | | // sizeof(A) = 4 bytes. |
1491 | | // A = length(small block1) + length(small block2) + length(small block3) |
1492 | | // Bx : length of small data block bx. |
1493 | | // sizeof(Bx) = 4 bytes. |
1494 | | // Bx = length(compress(small blockx)) |
1495 | 0 | try { |
1496 | 0 | while (remain_input_size > 0) { |
1497 | 0 | if (remain_input_size < 4) { |
1498 | 0 | return Status::InvalidArgument( |
1499 | 0 | "Need more input buffer to get large_block_uncompressed_len."); |
1500 | 0 | } |
1501 | | |
1502 | 0 | uint32_t large_block_uncompressed_len = BigEndian::Load32(input_ptr); |
1503 | 0 | input_ptr += 4; |
1504 | 0 | remain_input_size -= 4; |
1505 | |
|
1506 | 0 | if (remain_output_size < large_block_uncompressed_len) { |
1507 | 0 | return Status::InvalidArgument( |
1508 | 0 | "Need more output buffer to get uncompressed data."); |
1509 | 0 | } |
1510 | | |
1511 | 0 | while (large_block_uncompressed_len > 0) { |
1512 | 0 | if (remain_input_size < 4) { |
1513 | 0 | return Status::InvalidArgument( |
1514 | 0 | "Need more input buffer to get small_block_compressed_len."); |
1515 | 0 | } |
1516 | | |
1517 | 0 | uint32_t small_block_compressed_len = BigEndian::Load32(input_ptr); |
1518 | 0 | input_ptr += 4; |
1519 | 0 | remain_input_size -= 4; |
1520 | |
|
1521 | 0 | if (remain_input_size < small_block_compressed_len) { |
1522 | 0 | return Status::InvalidArgument( |
1523 | 0 | "Need more input buffer to decompress small block."); |
1524 | 0 | } |
1525 | | |
1526 | 0 | auto small_block_uncompressed_len = |
1527 | 0 | orc::lzoDecompress(input_ptr, input_ptr + small_block_compressed_len, |
1528 | 0 | output_ptr, output_limit); |
1529 | |
|
1530 | 0 | input_ptr += small_block_compressed_len; |
1531 | 0 | remain_input_size -= small_block_compressed_len; |
1532 | |
|
1533 | 0 | output_ptr += small_block_uncompressed_len; |
1534 | 0 | large_block_uncompressed_len -= small_block_uncompressed_len; |
1535 | 0 | remain_output_size -= small_block_uncompressed_len; |
1536 | 0 | } |
1537 | 0 | } |
1538 | 0 | } catch (const orc::ParseError& e) { |
1539 | | //Prevent be from hanging due to orc::lzoDecompress throw exception |
1540 | 0 | return Status::InternalError("Fail to do LZO decompress, error={}", e.what()); |
1541 | 0 | } |
1542 | 0 | return Status::OK(); |
1543 | 0 | } |
1544 | | }; |
1545 | | |
1546 | | class BrotliBlockCompression final : public BlockCompressionCodec { |
1547 | | public: |
1548 | 0 | static BrotliBlockCompression* instance() { |
1549 | 0 | static BrotliBlockCompression s_instance; |
1550 | 0 | return &s_instance; |
1551 | 0 | } |
1552 | | |
1553 | 0 | Status compress(const Slice& input, faststring* output) override { |
1554 | 0 | return Status::InvalidArgument("not impl brotli compress."); |
1555 | 0 | } |
1556 | | |
1557 | 0 | size_t max_compressed_len(size_t len) override { return 0; }; |
1558 | | |
1559 | 0 | Status decompress(const Slice& input, Slice* output) override { |
1560 | | // The size of output buffer is always equal to the umcompressed length. |
1561 | 0 | BrotliDecoderResult result = BrotliDecoderDecompress( |
1562 | 0 | input.get_size(), reinterpret_cast<const uint8_t*>(input.get_data()), &output->size, |
1563 | 0 | reinterpret_cast<uint8_t*>(output->data)); |
1564 | 0 | if (result != BROTLI_DECODER_RESULT_SUCCESS) { |
1565 | 0 | return Status::InternalError("Brotli decompression failed, result={}", result); |
1566 | 0 | } |
1567 | 0 | return Status::OK(); |
1568 | 0 | } |
1569 | | }; |
1570 | | |
1571 | | Status get_block_compression_codec(segment_v2::CompressionTypePB type, |
1572 | 25.6k | BlockCompressionCodec** codec) { |
1573 | 25.6k | switch (type) { |
1574 | 130 | case segment_v2::CompressionTypePB::NO_COMPRESSION: |
1575 | 130 | *codec = nullptr; |
1576 | 130 | break; |
1577 | 47 | case segment_v2::CompressionTypePB::SNAPPY: |
1578 | 47 | *codec = SnappyBlockCompression::instance(); |
1579 | 47 | break; |
1580 | 59 | case segment_v2::CompressionTypePB::LZ4: |
1581 | 59 | *codec = Lz4BlockCompression::instance(); |
1582 | 59 | break; |
1583 | 25.0k | case segment_v2::CompressionTypePB::LZ4F: |
1584 | 25.0k | *codec = Lz4fBlockCompression::instance(); |
1585 | 25.0k | break; |
1586 | 2 | case segment_v2::CompressionTypePB::LZ4HC: |
1587 | 2 | *codec = Lz4HCBlockCompression::instance(); |
1588 | 2 | break; |
1589 | 2 | case segment_v2::CompressionTypePB::ZLIB: |
1590 | 2 | *codec = ZlibBlockCompression::instance(); |
1591 | 2 | break; |
1592 | 359 | case segment_v2::CompressionTypePB::ZSTD: |
1593 | 359 | *codec = ZstdBlockCompression::instance(); |
1594 | 359 | break; |
1595 | 0 | default: |
1596 | 0 | return Status::InternalError("unknown compression type({})", type); |
1597 | 25.6k | } |
1598 | | |
1599 | 25.6k | return Status::OK(); |
1600 | 25.6k | } |
1601 | | |
1602 | | // this can only be used in hive text write |
1603 | 0 | Status get_block_compression_codec(TFileCompressType::type type, BlockCompressionCodec** codec) { |
1604 | 0 | switch (type) { |
1605 | 0 | case TFileCompressType::PLAIN: |
1606 | 0 | *codec = nullptr; |
1607 | 0 | break; |
1608 | 0 | case TFileCompressType::ZLIB: |
1609 | 0 | *codec = ZlibBlockCompression::instance(); |
1610 | 0 | break; |
1611 | 0 | case TFileCompressType::GZ: |
1612 | 0 | *codec = GzipBlockCompression::instance(); |
1613 | 0 | break; |
1614 | 0 | case TFileCompressType::BZ2: |
1615 | 0 | *codec = Bzip2BlockCompression::instance(); |
1616 | 0 | break; |
1617 | 0 | case TFileCompressType::LZ4BLOCK: |
1618 | 0 | *codec = HadoopLz4BlockCompression::instance(); |
1619 | 0 | break; |
1620 | 0 | case TFileCompressType::SNAPPYBLOCK: |
1621 | 0 | *codec = HadoopSnappyBlockCompression::instance(); |
1622 | 0 | break; |
1623 | 0 | case TFileCompressType::ZSTD: |
1624 | 0 | *codec = ZstdBlockCompression::instance(); |
1625 | 0 | break; |
1626 | 0 | default: |
1627 | 0 | return Status::InternalError("unsupport compression type({}) int hive text", type); |
1628 | 0 | } |
1629 | | |
1630 | 0 | return Status::OK(); |
1631 | 0 | } |
1632 | | |
1633 | | Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_codec, |
1634 | 39 | BlockCompressionCodec** codec) { |
1635 | 39 | switch (parquet_codec) { |
1636 | 0 | case tparquet::CompressionCodec::UNCOMPRESSED: |
1637 | 0 | *codec = nullptr; |
1638 | 0 | break; |
1639 | 39 | case tparquet::CompressionCodec::SNAPPY: |
1640 | 39 | *codec = SnappyBlockCompression::instance(); |
1641 | 39 | break; |
1642 | 0 | case tparquet::CompressionCodec::LZ4_RAW: // we can use LZ4 compression algorithm parse LZ4_RAW |
1643 | 0 | case tparquet::CompressionCodec::LZ4: |
1644 | 0 | *codec = HadoopLz4BlockCompression::instance(); |
1645 | 0 | break; |
1646 | 0 | case tparquet::CompressionCodec::ZSTD: |
1647 | 0 | *codec = ZstdBlockCompression::instance(); |
1648 | 0 | break; |
1649 | 0 | case tparquet::CompressionCodec::GZIP: |
1650 | | // Only used on x86 or x86_64 |
1651 | 0 | #if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \ |
1652 | 0 | defined(__i386) || defined(_M_IX86) |
1653 | 0 | *codec = GzipBlockCompressionByLibdeflate::instance(); |
1654 | | #else |
1655 | | *codec = GzipBlockCompression::instance(); |
1656 | | #endif |
1657 | 0 | break; |
1658 | 0 | case tparquet::CompressionCodec::LZO: |
1659 | 0 | *codec = LzoBlockCompression::instance(); |
1660 | 0 | break; |
1661 | 0 | case tparquet::CompressionCodec::BROTLI: |
1662 | 0 | *codec = BrotliBlockCompression::instance(); |
1663 | 0 | break; |
1664 | 0 | default: |
1665 | 0 | return Status::InternalError("unknown compression type({})", parquet_codec); |
1666 | 39 | } |
1667 | | |
1668 | 39 | return Status::OK(); |
1669 | 39 | } |
1670 | | |
1671 | | } // namespace doris |