be/src/util/block_compression.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "util/block_compression.h" |
19 | | |
20 | | #include <bzlib.h> |
21 | | #include <gen_cpp/parquet_types.h> |
22 | | #include <gen_cpp/segment_v2.pb.h> |
23 | | #include <glog/logging.h> |
24 | | |
25 | | #include <exception> |
26 | | // Only used on x86 or x86_64 |
27 | | #if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \ |
28 | | defined(__i386) || defined(_M_IX86) |
29 | | #include <libdeflate.h> |
30 | | #endif |
31 | | #include <brotli/decode.h> |
32 | | #include <glog/log_severity.h> |
33 | | #include <glog/logging.h> |
34 | | #include <lz4/lz4.h> |
35 | | #include <lz4/lz4frame.h> |
36 | | #include <lz4/lz4hc.h> |
37 | | #include <snappy/snappy-sinksource.h> |
38 | | #include <snappy/snappy.h> |
39 | | #include <zconf.h> |
40 | | #include <zlib.h> |
41 | | #include <zstd.h> |
42 | | #include <zstd_errors.h> |
43 | | |
44 | | #include <algorithm> |
45 | | #include <cstdint> |
46 | | #include <limits> |
47 | | #include <mutex> |
48 | | #include <orc/Exceptions.hh> |
49 | | #include <ostream> |
50 | | |
51 | | #include "absl/strings/substitute.h" |
52 | | #include "common/config.h" |
53 | | #include "common/factory_creator.h" |
54 | | #include "exec/common/endian.h" |
55 | | #include "runtime/thread_context.h" |
56 | | #include "util/decompressor.h" |
57 | | #include "util/defer_op.h" |
58 | | #include "util/faststring.h" |
59 | | |
60 | | namespace orc { |
61 | | /** |
62 | | * Decompress the bytes in to the output buffer. |
63 | | * @param inputAddress the start of the input |
64 | | * @param inputLimit one past the last byte of the input |
65 | | * @param outputAddress the start of the output buffer |
66 | | * @param outputLimit one past the last byte of the output buffer |
67 | | * @result the number of bytes decompressed |
68 | | */ |
69 | | uint64_t lzoDecompress(const char* inputAddress, const char* inputLimit, char* outputAddress, |
70 | | char* outputLimit); |
71 | | } // namespace orc |
72 | | |
73 | | namespace doris { |
74 | | |
75 | | // exception safe |
76 | | Status BlockCompressionCodec::compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
77 | 307 | faststring* output) { |
78 | 307 | faststring buf; |
79 | | // we compute total size to avoid more memory copy |
80 | 307 | buf.reserve(uncompressed_size); |
81 | 366 | for (auto& input : inputs) { |
82 | 366 | buf.append(input.data, input.size); |
83 | 366 | } |
84 | 307 | return compress(buf, output); |
85 | 307 | } |
86 | | |
87 | 362k | bool BlockCompressionCodec::exceed_max_compress_len(size_t uncompressed_size) { |
88 | 362k | return uncompressed_size > std::numeric_limits<int32_t>::max(); |
89 | 362k | } |
90 | | |
91 | | class Lz4BlockCompression : public BlockCompressionCodec { |
92 | | private: |
93 | | class Context { |
94 | | ENABLE_FACTORY_CREATOR(Context); |
95 | | |
96 | | public: |
97 | 23 | Context() : ctx(nullptr) { |
98 | 23 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
99 | 23 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
100 | 23 | buffer = std::make_unique<faststring>(); |
101 | 23 | } |
102 | | LZ4_stream_t* ctx; |
103 | | std::unique_ptr<faststring> buffer; |
104 | 5 | ~Context() { |
105 | 5 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
106 | 5 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
107 | 5 | if (ctx) { |
108 | 5 | LZ4_freeStream(ctx); |
109 | 5 | } |
110 | 5 | buffer.reset(); |
111 | 5 | } |
112 | | }; |
113 | | |
114 | | public: |
115 | 70.7k | static Lz4BlockCompression* instance() { |
116 | 70.7k | static Lz4BlockCompression s_instance; |
117 | 70.7k | return &s_instance; |
118 | 70.7k | } |
119 | 3 | ~Lz4BlockCompression() override { |
120 | 3 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
121 | 3 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
122 | 3 | _ctx_pool.clear(); |
123 | 3 | } |
124 | | |
125 | 33.8k | Status compress(const Slice& input, faststring* output) override { |
126 | 33.8k | if (input.size > LZ4_MAX_INPUT_SIZE) { |
127 | 0 | return Status::InvalidArgument( |
128 | 0 | "LZ4 not support those case(input.size>LZ4_MAX_INPUT_SIZE), maybe you should " |
129 | 0 | "change " |
130 | 0 | "fragment_transmission_compression_codec to snappy, input.size={}, " |
131 | 0 | "LZ4_MAX_INPUT_SIZE={}", |
132 | 0 | input.size, LZ4_MAX_INPUT_SIZE); |
133 | 0 | } |
134 | | |
135 | 33.8k | std::unique_ptr<Context> context; |
136 | 33.8k | RETURN_IF_ERROR(_acquire_compression_ctx(context)); |
137 | 33.8k | bool compress_failed = false; |
138 | 33.9k | Defer defer {[&] { |
139 | 33.9k | if (!compress_failed) { |
140 | 33.9k | _release_compression_ctx(std::move(context)); |
141 | 33.9k | } |
142 | 33.9k | }}; |
143 | | |
144 | 33.8k | try { |
145 | 33.8k | Slice compressed_buf; |
146 | 33.8k | size_t max_len = max_compressed_len(input.size); |
147 | 33.8k | if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
148 | | // use output directly |
149 | 1 | output->resize(max_len); |
150 | 1 | compressed_buf.data = reinterpret_cast<char*>(output->data()); |
151 | 1 | compressed_buf.size = max_len; |
152 | 33.8k | } else { |
153 | | // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE |
154 | 33.8k | { |
155 | | // context->buffer is resuable between queries, should accouting to |
156 | | // global tracker. |
157 | 33.8k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
158 | 33.8k | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
159 | 33.8k | context->buffer->resize(max_len); |
160 | 33.8k | } |
161 | 33.8k | compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); |
162 | 33.8k | compressed_buf.size = max_len; |
163 | 33.8k | } |
164 | | |
165 | | // input.size is aready checked before; |
166 | | // compressed_buf.size is got from max_compressed_len, which is |
167 | | // the return value of LZ4_compressBound, so it is safe to cast to int |
168 | 33.8k | size_t compressed_len = LZ4_compress_fast_continue( |
169 | 33.8k | context->ctx, input.data, compressed_buf.data, static_cast<int>(input.size), |
170 | 33.8k | static_cast<int>(compressed_buf.size), ACCELARATION); |
171 | 33.8k | if (compressed_len == 0) { |
172 | 0 | compress_failed = true; |
173 | 0 | return Status::InvalidArgument("Output buffer's capacity is not enough, size={}", |
174 | 0 | compressed_buf.size); |
175 | 0 | } |
176 | 33.8k | output->resize(compressed_len); |
177 | 33.9k | if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
178 | 33.9k | output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), |
179 | 33.9k | compressed_len); |
180 | 33.9k | } |
181 | 33.8k | } catch (...) { |
182 | | // Do not set compress_failed to release context |
183 | 0 | DCHECK(!compress_failed); |
184 | 0 | return Status::InternalError("Fail to do LZ4Block compress due to exception"); |
185 | 0 | } |
186 | 33.9k | return Status::OK(); |
187 | 33.8k | } |
188 | | |
189 | 36.8k | Status decompress(const Slice& input, Slice* output) override { |
190 | 36.8k | auto decompressed_len = LZ4_decompress_safe( |
191 | 36.8k | input.data, output->data, cast_set<int>(input.size), cast_set<int>(output->size)); |
192 | 36.8k | if (decompressed_len < 0) { |
193 | 5 | return Status::InternalError("fail to do LZ4 decompress, error={}", decompressed_len); |
194 | 5 | } |
195 | 36.8k | output->size = decompressed_len; |
196 | 36.8k | return Status::OK(); |
197 | 36.8k | } |
198 | | |
199 | 33.9k | size_t max_compressed_len(size_t len) override { return LZ4_compressBound(cast_set<int>(len)); } |
200 | | |
201 | | private: |
202 | | // reuse LZ4 compress stream |
203 | 33.8k | Status _acquire_compression_ctx(std::unique_ptr<Context>& out) { |
204 | 33.8k | std::lock_guard<std::mutex> l(_ctx_mutex); |
205 | 33.8k | if (_ctx_pool.empty()) { |
206 | 23 | std::unique_ptr<Context> localCtx = Context::create_unique(); |
207 | 23 | if (localCtx.get() == nullptr) { |
208 | 0 | return Status::InvalidArgument("new LZ4 context error"); |
209 | 0 | } |
210 | 23 | localCtx->ctx = LZ4_createStream(); |
211 | 23 | if (localCtx->ctx == nullptr) { |
212 | 0 | return Status::InvalidArgument("LZ4_createStream error"); |
213 | 0 | } |
214 | 23 | out = std::move(localCtx); |
215 | 23 | return Status::OK(); |
216 | 23 | } |
217 | 33.8k | out = std::move(_ctx_pool.back()); |
218 | 33.8k | _ctx_pool.pop_back(); |
219 | 33.8k | return Status::OK(); |
220 | 33.8k | } |
221 | 33.9k | void _release_compression_ctx(std::unique_ptr<Context> context) { |
222 | 33.9k | DCHECK(context); |
223 | 33.9k | LZ4_resetStream(context->ctx); |
224 | 33.9k | std::lock_guard<std::mutex> l(_ctx_mutex); |
225 | 33.9k | _ctx_pool.push_back(std::move(context)); |
226 | 33.9k | } |
227 | | |
228 | | private: |
229 | | mutable std::mutex _ctx_mutex; |
230 | | mutable std::vector<std::unique_ptr<Context>> _ctx_pool; |
231 | | static const int32_t ACCELARATION = 1; |
232 | | }; |
233 | | |
234 | | class HadoopLz4BlockCompression : public Lz4BlockCompression { |
235 | | public: |
236 | 1 | HadoopLz4BlockCompression() { |
237 | 1 | Status st = Decompressor::create_decompressor(CompressType::LZ4BLOCK, &_decompressor); |
238 | 1 | if (!st.ok()) { |
239 | 0 | throw Exception(Status::FatalError( |
240 | 0 | "HadoopLz4BlockCompression construction failed. status = {}", st)); |
241 | 0 | } |
242 | 1 | } |
243 | | |
244 | 0 | ~HadoopLz4BlockCompression() override = default; |
245 | | |
246 | 8 | static HadoopLz4BlockCompression* instance() { |
247 | 8 | static HadoopLz4BlockCompression s_instance; |
248 | 8 | return &s_instance; |
249 | 8 | } |
250 | | |
251 | | // hadoop use block compression for lz4 |
252 | | // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc |
253 | 69 | Status compress(const Slice& input, faststring* output) override { |
254 | | // be same with hadop https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java |
255 | 69 | size_t lz4_block_size = config::lz4_compression_block_size; |
256 | 69 | size_t overhead = lz4_block_size / 255 + 16; |
257 | 69 | size_t max_input_size = lz4_block_size - overhead; |
258 | | |
259 | 69 | size_t data_len = input.size; |
260 | 69 | char* data = input.data; |
261 | 69 | std::vector<OwnedSlice> buffers; |
262 | 69 | size_t out_len = 0; |
263 | | |
264 | 138 | while (data_len > 0) { |
265 | 69 | size_t input_size = std::min(data_len, max_input_size); |
266 | 69 | Slice input_slice(data, input_size); |
267 | 69 | faststring output_data; |
268 | 69 | RETURN_IF_ERROR(Lz4BlockCompression::compress(input_slice, &output_data)); |
269 | 69 | out_len += output_data.size(); |
270 | 69 | buffers.push_back(output_data.build()); |
271 | 69 | data += input_size; |
272 | 69 | data_len -= input_size; |
273 | 69 | } |
274 | | |
275 | | // hadoop block compression: umcompressed_length | compressed_length1 | compressed_data1 | compressed_length2 | compressed_data2 | ... |
276 | 69 | size_t total_output_len = 4 + 4 * buffers.size() + out_len; |
277 | 69 | output->resize(total_output_len); |
278 | 69 | char* output_buffer = (char*)output->data(); |
279 | 69 | BigEndian::Store32(output_buffer, cast_set<uint32_t>(input.get_size())); |
280 | 69 | output_buffer += 4; |
281 | 69 | for (const auto& buffer : buffers) { |
282 | 69 | auto slice = buffer.slice(); |
283 | 69 | BigEndian::Store32(output_buffer, cast_set<uint32_t>(slice.get_size())); |
284 | 69 | output_buffer += 4; |
285 | 69 | memcpy(output_buffer, slice.get_data(), slice.get_size()); |
286 | 69 | output_buffer += slice.get_size(); |
287 | 69 | } |
288 | | |
289 | 69 | DCHECK_EQ(output_buffer - (char*)output->data(), total_output_len); |
290 | | |
291 | 69 | return Status::OK(); |
292 | 69 | } |
293 | | |
294 | 8 | Status decompress(const Slice& input, Slice* output) override { |
295 | 8 | size_t input_bytes_read = 0; |
296 | 8 | size_t decompressed_len = 0; |
297 | 8 | size_t more_input_bytes = 0; |
298 | 8 | size_t more_output_bytes = 0; |
299 | 8 | bool stream_end = false; |
300 | 8 | auto st = _decompressor->decompress((uint8_t*)input.data, cast_set<uint32_t>(input.size), |
301 | 8 | &input_bytes_read, (uint8_t*)output->data, |
302 | 8 | cast_set<uint32_t>(output->size), &decompressed_len, |
303 | 8 | &stream_end, &more_input_bytes, &more_output_bytes); |
304 | | //try decompress use hadoopLz4 ,if failed fall back lz4. |
305 | 8 | return (st != Status::OK() || stream_end != true) |
306 | 8 | ? Lz4BlockCompression::decompress(input, output) |
307 | 8 | : Status::OK(); |
308 | 8 | } |
309 | | |
310 | | private: |
311 | | std::unique_ptr<Decompressor> _decompressor; |
312 | | }; |
313 | | // Used for LZ4 frame format, decompress speed is two times faster than LZ4. |
314 | | class Lz4fBlockCompression : public BlockCompressionCodec { |
315 | | private: |
316 | | class CContext { |
317 | | ENABLE_FACTORY_CREATOR(CContext); |
318 | | |
319 | | public: |
320 | 1 | CContext() : ctx(nullptr) { |
321 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
322 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
323 | 1 | buffer = std::make_unique<faststring>(); |
324 | 1 | } |
325 | | LZ4F_compressionContext_t ctx; |
326 | | std::unique_ptr<faststring> buffer; |
327 | 1 | ~CContext() { |
328 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
329 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
330 | 1 | if (ctx) { |
331 | 1 | LZ4F_freeCompressionContext(ctx); |
332 | 1 | } |
333 | 1 | buffer.reset(); |
334 | 1 | } |
335 | | }; |
336 | | class DContext { |
337 | | ENABLE_FACTORY_CREATOR(DContext); |
338 | | |
339 | | public: |
340 | 6 | DContext() : ctx(nullptr) {} |
341 | | LZ4F_decompressionContext_t ctx; |
342 | 6 | ~DContext() { |
343 | 6 | if (ctx) { |
344 | 6 | LZ4F_freeDecompressionContext(ctx); |
345 | 6 | } |
346 | 6 | } |
347 | | }; |
348 | | |
349 | | public: |
350 | 31.1k | static Lz4fBlockCompression* instance() { |
351 | 31.1k | static Lz4fBlockCompression s_instance; |
352 | 31.1k | return &s_instance; |
353 | 31.1k | } |
354 | 1 | ~Lz4fBlockCompression() { |
355 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
356 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
357 | 1 | _ctx_c_pool.clear(); |
358 | 1 | _ctx_d_pool.clear(); |
359 | 1 | } |
360 | | |
361 | 5 | Status compress(const Slice& input, faststring* output) override { |
362 | 5 | std::vector<Slice> inputs {input}; |
363 | 5 | return compress(inputs, input.size, output); |
364 | 5 | } |
365 | | |
366 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
367 | 25.6k | faststring* output) override { |
368 | 25.6k | return _compress(inputs, uncompressed_size, output); |
369 | 25.6k | } |
370 | | |
371 | 8.51k | Status decompress(const Slice& input, Slice* output) override { |
372 | 8.51k | return _decompress(input, output); |
373 | 8.51k | } |
374 | | |
375 | 25.6k | size_t max_compressed_len(size_t len) override { |
376 | 25.6k | return std::max(LZ4F_compressBound(len, &_s_preferences), |
377 | 25.6k | LZ4F_compressFrameBound(len, &_s_preferences)); |
378 | 25.6k | } |
379 | | |
380 | | private: |
381 | | Status _compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
382 | 25.6k | faststring* output) { |
383 | 25.6k | std::unique_ptr<CContext> context; |
384 | 25.6k | RETURN_IF_ERROR(_acquire_compression_ctx(context)); |
385 | 25.6k | bool compress_failed = false; |
386 | 25.6k | Defer defer {[&] { |
387 | 25.6k | if (!compress_failed) { |
388 | 25.6k | _release_compression_ctx(std::move(context)); |
389 | 25.6k | } |
390 | 25.6k | }}; |
391 | | |
392 | 25.6k | try { |
393 | 25.6k | Slice compressed_buf; |
394 | 25.6k | size_t max_len = max_compressed_len(uncompressed_size); |
395 | 25.6k | if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
396 | | // use output directly |
397 | 0 | output->resize(max_len); |
398 | 0 | compressed_buf.data = reinterpret_cast<char*>(output->data()); |
399 | 0 | compressed_buf.size = max_len; |
400 | 25.6k | } else { |
401 | 25.6k | { |
402 | 25.6k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
403 | 25.6k | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
404 | | // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE |
405 | 25.6k | context->buffer->resize(max_len); |
406 | 25.6k | } |
407 | 25.6k | compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); |
408 | 25.6k | compressed_buf.size = max_len; |
409 | 25.6k | } |
410 | | |
411 | 25.6k | auto wbytes = LZ4F_compressBegin(context->ctx, compressed_buf.data, compressed_buf.size, |
412 | 25.6k | &_s_preferences); |
413 | 25.6k | if (LZ4F_isError(wbytes)) { |
414 | 0 | compress_failed = true; |
415 | 0 | return Status::InvalidArgument("Fail to do LZ4F compress begin, res={}", |
416 | 0 | LZ4F_getErrorName(wbytes)); |
417 | 0 | } |
418 | 25.6k | size_t offset = wbytes; |
419 | 26.2k | for (auto input : inputs) { |
420 | 26.2k | wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data + offset, |
421 | 26.2k | compressed_buf.size - offset, input.data, input.size, |
422 | 26.2k | nullptr); |
423 | 26.2k | if (LZ4F_isError(wbytes)) { |
424 | 0 | compress_failed = true; |
425 | 0 | return Status::InvalidArgument("Fail to do LZ4F compress update, res={}", |
426 | 0 | LZ4F_getErrorName(wbytes)); |
427 | 0 | } |
428 | 26.2k | offset += wbytes; |
429 | 26.2k | } |
430 | 25.6k | wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + offset, |
431 | 25.6k | compressed_buf.size - offset, nullptr); |
432 | 25.6k | if (LZ4F_isError(wbytes)) { |
433 | 0 | compress_failed = true; |
434 | 0 | return Status::InvalidArgument("Fail to do LZ4F compress end, res={}", |
435 | 0 | LZ4F_getErrorName(wbytes)); |
436 | 0 | } |
437 | 25.6k | offset += wbytes; |
438 | 25.6k | output->resize(offset); |
439 | 25.6k | if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
440 | 25.6k | output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), offset); |
441 | 25.6k | } |
442 | 25.6k | } catch (...) { |
443 | | // Do not set compress_failed to release context |
444 | 0 | DCHECK(!compress_failed); |
445 | 0 | return Status::InternalError("Fail to do LZ4F compress due to exception"); |
446 | 0 | } |
447 | | |
448 | 25.6k | return Status::OK(); |
449 | 25.6k | } |
450 | | |
451 | 8.51k | Status _decompress(const Slice& input, Slice* output) { |
452 | 8.51k | bool decompress_failed = false; |
453 | 8.51k | std::unique_ptr<DContext> context; |
454 | 8.51k | RETURN_IF_ERROR(_acquire_decompression_ctx(context)); |
455 | 8.51k | Defer defer {[&] { |
456 | 8.51k | if (!decompress_failed) { |
457 | 8.50k | _release_decompression_ctx(std::move(context)); |
458 | 8.50k | } |
459 | 8.51k | }}; |
460 | 8.51k | size_t input_size = input.size; |
461 | 8.51k | auto lres = LZ4F_decompress(context->ctx, output->data, &output->size, input.data, |
462 | 8.51k | &input_size, nullptr); |
463 | 8.51k | if (LZ4F_isError(lres)) { |
464 | 0 | decompress_failed = true; |
465 | 0 | return Status::InternalError("Fail to do LZ4F decompress, res={}", |
466 | 0 | LZ4F_getErrorName(lres)); |
467 | 8.51k | } else if (input_size != input.size) { |
468 | 0 | decompress_failed = true; |
469 | 0 | return Status::InvalidArgument( |
470 | 0 | absl::Substitute("Fail to do LZ4F decompress: trailing data left in " |
471 | 0 | "compressed data, read=$0 vs given=$1", |
472 | 0 | input_size, input.size)); |
473 | 8.51k | } else if (lres != 0) { |
474 | 5 | decompress_failed = true; |
475 | 5 | return Status::InvalidArgument( |
476 | 5 | "Fail to do LZ4F decompress: expect more compressed data, expect={}", lres); |
477 | 5 | } |
478 | 8.50k | return Status::OK(); |
479 | 8.51k | } |
480 | | |
481 | | private: |
482 | | // acquire a compression ctx from pool, release while finish compress, |
483 | | // delete if compression failed |
484 | 25.6k | Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) { |
485 | 25.6k | std::lock_guard<std::mutex> l(_ctx_c_mutex); |
486 | 25.6k | if (_ctx_c_pool.empty()) { |
487 | 1 | std::unique_ptr<CContext> localCtx = CContext::create_unique(); |
488 | 1 | if (localCtx.get() == nullptr) { |
489 | 0 | return Status::InvalidArgument("failed to new LZ4F CContext"); |
490 | 0 | } |
491 | 1 | auto res = LZ4F_createCompressionContext(&localCtx->ctx, LZ4F_VERSION); |
492 | 1 | if (LZ4F_isError(res) != 0) { |
493 | 0 | return Status::InvalidArgument(absl::Substitute( |
494 | 0 | "LZ4F_createCompressionContext error, res=$0", LZ4F_getErrorName(res))); |
495 | 0 | } |
496 | 1 | out = std::move(localCtx); |
497 | 1 | return Status::OK(); |
498 | 1 | } |
499 | 25.6k | out = std::move(_ctx_c_pool.back()); |
500 | 25.6k | _ctx_c_pool.pop_back(); |
501 | 25.6k | return Status::OK(); |
502 | 25.6k | } |
503 | 25.6k | void _release_compression_ctx(std::unique_ptr<CContext> context) { |
504 | 25.6k | DCHECK(context); |
505 | 25.6k | std::lock_guard<std::mutex> l(_ctx_c_mutex); |
506 | 25.6k | _ctx_c_pool.push_back(std::move(context)); |
507 | 25.6k | } |
508 | | |
509 | 8.51k | Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) { |
510 | 8.51k | std::lock_guard<std::mutex> l(_ctx_d_mutex); |
511 | 8.51k | if (_ctx_d_pool.empty()) { |
512 | 6 | std::unique_ptr<DContext> localCtx = DContext::create_unique(); |
513 | 6 | if (localCtx.get() == nullptr) { |
514 | 0 | return Status::InvalidArgument("failed to new LZ4F DContext"); |
515 | 0 | } |
516 | 6 | auto res = LZ4F_createDecompressionContext(&localCtx->ctx, LZ4F_VERSION); |
517 | 6 | if (LZ4F_isError(res) != 0) { |
518 | 0 | return Status::InvalidArgument(absl::Substitute( |
519 | 0 | "LZ4F_createDeompressionContext error, res=$0", LZ4F_getErrorName(res))); |
520 | 0 | } |
521 | 6 | out = std::move(localCtx); |
522 | 6 | return Status::OK(); |
523 | 6 | } |
524 | 8.50k | out = std::move(_ctx_d_pool.back()); |
525 | 8.50k | _ctx_d_pool.pop_back(); |
526 | 8.50k | return Status::OK(); |
527 | 8.51k | } |
528 | 8.50k | void _release_decompression_ctx(std::unique_ptr<DContext> context) { |
529 | 8.50k | DCHECK(context); |
530 | | // reset decompression context to avoid ERROR_maxBlockSize_invalid |
531 | 8.50k | LZ4F_resetDecompressionContext(context->ctx); |
532 | 8.50k | std::lock_guard<std::mutex> l(_ctx_d_mutex); |
533 | 8.50k | _ctx_d_pool.push_back(std::move(context)); |
534 | 8.50k | } |
535 | | |
536 | | private: |
537 | | static LZ4F_preferences_t _s_preferences; |
538 | | |
539 | | std::mutex _ctx_c_mutex; |
540 | | // LZ4F_compressionContext_t is a pointer so no copy here |
541 | | std::vector<std::unique_ptr<CContext>> _ctx_c_pool; |
542 | | |
543 | | std::mutex _ctx_d_mutex; |
544 | | std::vector<std::unique_ptr<DContext>> _ctx_d_pool; |
545 | | }; |
546 | | |
547 | | LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = { |
548 | | {LZ4F_max256KB, LZ4F_blockLinked, LZ4F_noContentChecksum, LZ4F_frame, 0ULL, 0U, |
549 | | LZ4F_noBlockChecksum}, |
550 | | 0, |
551 | | 0u, |
552 | | 0u, |
553 | | {0u, 0u, 0u}}; |
554 | | |
555 | | class Lz4HCBlockCompression : public BlockCompressionCodec { |
556 | | private: |
557 | | class Context { |
558 | | ENABLE_FACTORY_CREATOR(Context); |
559 | | |
560 | | public: |
561 | 1 | Context() : ctx(nullptr) { |
562 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
563 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
564 | 1 | buffer = std::make_unique<faststring>(); |
565 | 1 | } |
566 | | LZ4_streamHC_t* ctx; |
567 | | std::unique_ptr<faststring> buffer; |
568 | 1 | ~Context() { |
569 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
570 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
571 | 1 | if (ctx) { |
572 | 1 | LZ4_freeStreamHC(ctx); |
573 | 1 | } |
574 | 1 | buffer.reset(); |
575 | 1 | } |
576 | | }; |
577 | | |
578 | | public: |
579 | 2 | static Lz4HCBlockCompression* instance() { |
580 | 2 | static Lz4HCBlockCompression s_instance; |
581 | 2 | return &s_instance; |
582 | 2 | } |
583 | 1 | ~Lz4HCBlockCompression() { |
584 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
585 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
586 | 1 | _ctx_pool.clear(); |
587 | 1 | } |
588 | | |
589 | 6 | Status compress(const Slice& input, faststring* output) override { |
590 | 6 | std::unique_ptr<Context> context; |
591 | 6 | RETURN_IF_ERROR(_acquire_compression_ctx(context)); |
592 | 6 | bool compress_failed = false; |
593 | 6 | Defer defer {[&] { |
594 | 6 | if (!compress_failed) { |
595 | 6 | _release_compression_ctx(std::move(context)); |
596 | 6 | } |
597 | 6 | }}; |
598 | | |
599 | 6 | try { |
600 | 6 | Slice compressed_buf; |
601 | 6 | size_t max_len = max_compressed_len(input.size); |
602 | 6 | if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
603 | | // use output directly |
604 | 0 | output->resize(max_len); |
605 | 0 | compressed_buf.data = reinterpret_cast<char*>(output->data()); |
606 | 0 | compressed_buf.size = max_len; |
607 | 6 | } else { |
608 | 6 | { |
609 | 6 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
610 | 6 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
611 | | // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE |
612 | 6 | context->buffer->resize(max_len); |
613 | 6 | } |
614 | 6 | compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); |
615 | 6 | compressed_buf.size = max_len; |
616 | 6 | } |
617 | | |
618 | 6 | size_t compressed_len = LZ4_compress_HC_continue( |
619 | 6 | context->ctx, input.data, compressed_buf.data, cast_set<int>(input.size), |
620 | 6 | static_cast<int>(compressed_buf.size)); |
621 | 6 | if (compressed_len == 0) { |
622 | 0 | compress_failed = true; |
623 | 0 | return Status::InvalidArgument("Output buffer's capacity is not enough, size={}", |
624 | 0 | compressed_buf.size); |
625 | 0 | } |
626 | 6 | output->resize(compressed_len); |
627 | 6 | if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
628 | 6 | output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), |
629 | 6 | compressed_len); |
630 | 6 | } |
631 | 6 | } catch (...) { |
632 | | // Do not set compress_failed to release context |
633 | 0 | DCHECK(!compress_failed); |
634 | 0 | return Status::InternalError("Fail to do LZ4HC compress due to exception"); |
635 | 0 | } |
636 | 6 | return Status::OK(); |
637 | 6 | } |
638 | | |
639 | 11 | Status decompress(const Slice& input, Slice* output) override { |
640 | 11 | auto decompressed_len = LZ4_decompress_safe( |
641 | 11 | input.data, output->data, cast_set<int>(input.size), cast_set<int>(output->size)); |
642 | 11 | if (decompressed_len < 0) { |
643 | 5 | return Status::InvalidArgument( |
644 | 5 | "destination buffer is not large enough or the source stream is detected " |
645 | 5 | "malformed, fail to do LZ4 decompress, error={}", |
646 | 5 | decompressed_len); |
647 | 5 | } |
648 | 6 | output->size = decompressed_len; |
649 | 6 | return Status::OK(); |
650 | 11 | } |
651 | | |
652 | 6 | size_t max_compressed_len(size_t len) override { return LZ4_compressBound(cast_set<int>(len)); } |
653 | | |
654 | | private: |
655 | 6 | Status _acquire_compression_ctx(std::unique_ptr<Context>& out) { |
656 | 6 | std::lock_guard<std::mutex> l(_ctx_mutex); |
657 | 6 | if (_ctx_pool.empty()) { |
658 | 1 | std::unique_ptr<Context> localCtx = Context::create_unique(); |
659 | 1 | if (localCtx.get() == nullptr) { |
660 | 0 | return Status::InvalidArgument("new LZ4HC context error"); |
661 | 0 | } |
662 | 1 | localCtx->ctx = LZ4_createStreamHC(); |
663 | 1 | if (localCtx->ctx == nullptr) { |
664 | 0 | return Status::InvalidArgument("LZ4_createStreamHC error"); |
665 | 0 | } |
666 | 1 | out = std::move(localCtx); |
667 | 1 | return Status::OK(); |
668 | 1 | } |
669 | 5 | out = std::move(_ctx_pool.back()); |
670 | 5 | _ctx_pool.pop_back(); |
671 | 5 | return Status::OK(); |
672 | 6 | } |
673 | 6 | void _release_compression_ctx(std::unique_ptr<Context> context) { |
674 | 6 | DCHECK(context); |
675 | 6 | LZ4_resetStreamHC_fast(context->ctx, static_cast<int>(_compression_level)); |
676 | 6 | std::lock_guard<std::mutex> l(_ctx_mutex); |
677 | 6 | _ctx_pool.push_back(std::move(context)); |
678 | 6 | } |
679 | | |
680 | | private: |
681 | | int64_t _compression_level = config::LZ4_HC_compression_level; |
682 | | mutable std::mutex _ctx_mutex; |
683 | | mutable std::vector<std::unique_ptr<Context>> _ctx_pool; |
684 | | }; |
685 | | |
686 | | class SnappySlicesSource : public snappy::Source { |
687 | | public: |
688 | | SnappySlicesSource(const std::vector<Slice>& slices) |
689 | 5 | : _available(0), _cur_slice(0), _slice_off(0) { |
690 | 9 | for (auto& slice : slices) { |
691 | | // We filter empty slice here to avoid complicated process |
692 | 9 | if (slice.size == 0) { |
693 | 1 | continue; |
694 | 1 | } |
695 | 8 | _available += slice.size; |
696 | 8 | _slices.push_back(slice); |
697 | 8 | } |
698 | 5 | } |
699 | 5 | ~SnappySlicesSource() override {} |
700 | | |
701 | | // Return the number of bytes left to read from the source |
702 | 5 | size_t Available() const override { return _available; } |
703 | | |
704 | | // Peek at the next flat region of the source. Does not reposition |
705 | | // the source. The returned region is empty iff Available()==0. |
706 | | // |
707 | | // Returns a pointer to the beginning of the region and store its |
708 | | // length in *len. |
709 | | // |
710 | | // The returned region is valid until the next call to Skip() or |
711 | | // until this object is destroyed, whichever occurs first. |
712 | | // |
713 | | // The returned region may be larger than Available() (for example |
714 | | // if this ByteSource is a view on a substring of a larger source). |
715 | | // The caller is responsible for ensuring that it only reads the |
716 | | // Available() bytes. |
717 | 23 | const char* Peek(size_t* len) override { |
718 | 23 | if (_available == 0) { |
719 | 0 | *len = 0; |
720 | 0 | return nullptr; |
721 | 0 | } |
722 | | // we should assure that *len is not 0 |
723 | 23 | *len = _slices[_cur_slice].size - _slice_off; |
724 | 23 | DCHECK(*len != 0); |
725 | 23 | return _slices[_cur_slice].data + _slice_off; |
726 | 23 | } |
727 | | |
728 | | // Skip the next n bytes. Invalidates any buffer returned by |
729 | | // a previous call to Peek(). |
730 | | // REQUIRES: Available() >= n |
731 | 24 | void Skip(size_t n) override { |
732 | 24 | _available -= n; |
733 | 32 | while (n > 0) { |
734 | 23 | auto left = _slices[_cur_slice].size - _slice_off; |
735 | 23 | if (left > n) { |
736 | | // n can be digest in current slice |
737 | 15 | _slice_off += n; |
738 | 15 | return; |
739 | 15 | } |
740 | 8 | _slice_off = 0; |
741 | 8 | _cur_slice++; |
742 | 8 | n -= left; |
743 | 8 | } |
744 | 24 | } |
745 | | |
746 | | private: |
747 | | std::vector<Slice> _slices; |
748 | | size_t _available; |
749 | | size_t _cur_slice; |
750 | | size_t _slice_off; |
751 | | }; |
752 | | |
753 | | class SnappyBlockCompression : public BlockCompressionCodec { |
754 | | public: |
755 | 65.9k | static SnappyBlockCompression* instance() { |
756 | 65.9k | static SnappyBlockCompression s_instance; |
757 | 65.9k | return &s_instance; |
758 | 65.9k | } |
759 | | ~SnappyBlockCompression() override = default; |
760 | | |
761 | 31.8k | Status compress(const Slice& input, faststring* output) override { |
762 | 31.8k | size_t max_len = max_compressed_len(input.size); |
763 | 31.8k | output->resize(max_len); |
764 | 31.8k | Slice s(*output); |
765 | | |
766 | 31.8k | snappy::RawCompress(input.data, input.size, s.data, &s.size); |
767 | 31.8k | output->resize(s.size); |
768 | 31.8k | return Status::OK(); |
769 | 31.8k | } |
770 | | |
771 | 35.0k | Status decompress(const Slice& input, Slice* output) override { |
772 | 35.0k | if (!snappy::RawUncompress(input.data, input.size, output->data)) { |
773 | 0 | return Status::InvalidArgument("Fail to do Snappy decompress"); |
774 | 0 | } |
775 | | // NOTE: GetUncompressedLength only takes O(1) time |
776 | 35.0k | snappy::GetUncompressedLength(input.data, input.size, &output->size); |
777 | 35.0k | return Status::OK(); |
778 | 35.0k | } |
779 | | |
780 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
781 | 5 | faststring* output) override { |
782 | 5 | auto max_len = max_compressed_len(uncompressed_size); |
783 | 5 | output->resize(max_len); |
784 | | |
785 | 5 | SnappySlicesSource source(inputs); |
786 | 5 | snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(output->data())); |
787 | 5 | output->resize(snappy::Compress(&source, &sink)); |
788 | 5 | return Status::OK(); |
789 | 5 | } |
790 | | |
791 | 31.8k | size_t max_compressed_len(size_t len) override { return snappy::MaxCompressedLength(len); } |
792 | | }; |
793 | | |
794 | | class HadoopSnappyBlockCompression : public SnappyBlockCompression { |
795 | | public: |
796 | 4 | static HadoopSnappyBlockCompression* instance() { |
797 | 4 | static HadoopSnappyBlockCompression s_instance; |
798 | 4 | return &s_instance; |
799 | 4 | } |
800 | | ~HadoopSnappyBlockCompression() override = default; |
801 | | |
802 | | // hadoop use block compression for snappy |
803 | | // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc |
804 | 69 | Status compress(const Slice& input, faststring* output) override { |
805 | | // be same with hadop https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java |
806 | 69 | size_t snappy_block_size = config::snappy_compression_block_size; |
807 | 69 | size_t overhead = snappy_block_size / 6 + 32; |
808 | 69 | size_t max_input_size = snappy_block_size - overhead; |
809 | | |
810 | 69 | size_t data_len = input.size; |
811 | 69 | char* data = input.data; |
812 | 69 | std::vector<OwnedSlice> buffers; |
813 | 69 | size_t out_len = 0; |
814 | | |
815 | 138 | while (data_len > 0) { |
816 | 69 | size_t input_size = std::min(data_len, max_input_size); |
817 | 69 | Slice input_slice(data, input_size); |
818 | 69 | faststring output_data; |
819 | 69 | RETURN_IF_ERROR(SnappyBlockCompression::compress(input_slice, &output_data)); |
820 | 69 | out_len += output_data.size(); |
821 | | // the OwnedSlice will be moved here |
822 | 69 | buffers.push_back(output_data.build()); |
823 | 69 | data += input_size; |
824 | 69 | data_len -= input_size; |
825 | 69 | } |
826 | | |
827 | | // hadoop block compression: umcompressed_length | compressed_length1 | compressed_data1 | compressed_length2 | compressed_data2 | ... |
828 | 69 | size_t total_output_len = 4 + 4 * buffers.size() + out_len; |
829 | 69 | output->resize(total_output_len); |
830 | 69 | char* output_buffer = (char*)output->data(); |
831 | 69 | BigEndian::Store32(output_buffer, cast_set<uint32_t>(input.get_size())); |
832 | 69 | output_buffer += 4; |
833 | 69 | for (const auto& buffer : buffers) { |
834 | 69 | auto slice = buffer.slice(); |
835 | 69 | BigEndian::Store32(output_buffer, cast_set<uint32_t>(slice.get_size())); |
836 | 69 | output_buffer += 4; |
837 | 69 | memcpy(output_buffer, slice.get_data(), slice.get_size()); |
838 | 69 | output_buffer += slice.get_size(); |
839 | 69 | } |
840 | | |
841 | 69 | DCHECK_EQ(output_buffer - (char*)output->data(), total_output_len); |
842 | | |
843 | 69 | return Status::OK(); |
844 | 69 | } |
845 | | |
846 | 0 | Status decompress(const Slice& input, Slice* output) override { |
847 | 0 | return Status::InternalError("unimplement: SnappyHadoopBlockCompression::decompress"); |
848 | 0 | } |
849 | | }; |
850 | | |
851 | | class ZlibBlockCompression : public BlockCompressionCodec { |
852 | | public: |
853 | 47 | static ZlibBlockCompression* instance() { |
854 | 47 | static ZlibBlockCompression s_instance; |
855 | 47 | return &s_instance; |
856 | 47 | } |
857 | | ~ZlibBlockCompression() override = default; |
858 | | |
859 | 21 | Status compress(const Slice& input, faststring* output) override { |
860 | 21 | size_t max_len = max_compressed_len(input.size); |
861 | 21 | output->resize(max_len); |
862 | 21 | Slice s(*output); |
863 | | |
864 | 21 | auto zres = ::compress((Bytef*)s.data, &s.size, (Bytef*)input.data, input.size); |
865 | 21 | if (zres == Z_MEM_ERROR) { |
866 | 0 | throw Exception(Status::MemoryLimitExceeded(fmt::format( |
867 | 0 | "ZLib compression failed due to memory allocationerror.error = {}, res = {} ", |
868 | 0 | zError(zres), zres))); |
869 | 21 | } else if (zres != Z_OK) { |
870 | 0 | return Status::InternalError("Fail to do Zlib compress, error={}", zError(zres)); |
871 | 0 | } |
872 | 21 | output->resize(s.size); |
873 | 21 | return Status::OK(); |
874 | 21 | } |
875 | | |
876 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
877 | 1 | faststring* output) override { |
878 | 1 | size_t max_len = max_compressed_len(uncompressed_size); |
879 | 1 | output->resize(max_len); |
880 | | |
881 | 1 | z_stream zstrm; |
882 | 1 | zstrm.zalloc = Z_NULL; |
883 | 1 | zstrm.zfree = Z_NULL; |
884 | 1 | zstrm.opaque = Z_NULL; |
885 | 1 | auto zres = deflateInit(&zstrm, Z_DEFAULT_COMPRESSION); |
886 | 1 | if (zres == Z_MEM_ERROR) { |
887 | 0 | throw Exception(Status::MemoryLimitExceeded( |
888 | 0 | "Fail to do ZLib stream compress, error={}, res={}", zError(zres), zres)); |
889 | 1 | } else if (zres != Z_OK) { |
890 | 0 | return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
891 | 0 | zError(zres), zres); |
892 | 0 | } |
893 | | // we assume that output is e |
894 | 1 | zstrm.next_out = (Bytef*)output->data(); |
895 | 1 | zstrm.avail_out = cast_set<decltype(zstrm.avail_out)>(output->size()); |
896 | 6 | for (int i = 0; i < inputs.size(); ++i) { |
897 | 5 | if (inputs[i].size == 0) { |
898 | 1 | continue; |
899 | 1 | } |
900 | 4 | zstrm.next_in = (Bytef*)inputs[i].data; |
901 | 4 | zstrm.avail_in = cast_set<decltype(zstrm.avail_in)>(inputs[i].size); |
902 | 4 | int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH; |
903 | | |
904 | 4 | zres = deflate(&zstrm, flush); |
905 | 4 | if (zres != Z_OK && zres != Z_STREAM_END) { |
906 | 0 | return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
907 | 0 | zError(zres), zres); |
908 | 0 | } |
909 | 4 | } |
910 | | |
911 | 1 | output->resize(zstrm.total_out); |
912 | 1 | zres = deflateEnd(&zstrm); |
913 | 1 | if (zres == Z_DATA_ERROR) { |
914 | 0 | return Status::InvalidArgument("Fail to do deflateEnd, error={}, res={}", zError(zres), |
915 | 0 | zres); |
916 | 1 | } else if (zres != Z_OK) { |
917 | 0 | return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}", |
918 | 0 | zError(zres), zres); |
919 | 0 | } |
920 | 1 | return Status::OK(); |
921 | 1 | } |
922 | | |
923 | 34 | Status decompress(const Slice& input, Slice* output) override { |
924 | 34 | size_t input_size = input.size; |
925 | 34 | auto zres = |
926 | 34 | ::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size); |
927 | 34 | if (zres == Z_DATA_ERROR) { |
928 | 4 | return Status::InvalidArgument("Fail to do ZLib decompress, error={}", zError(zres)); |
929 | 30 | } else if (zres == Z_MEM_ERROR) { |
930 | 0 | throw Exception(Status::MemoryLimitExceeded("Fail to do ZLib decompress, error={}", |
931 | 0 | zError(zres))); |
932 | 30 | } else if (zres != Z_OK) { |
933 | 7 | return Status::InternalError("Fail to do ZLib decompress, error={}", zError(zres)); |
934 | 7 | } |
935 | 23 | return Status::OK(); |
936 | 34 | } |
937 | | |
938 | 22 | size_t max_compressed_len(size_t len) override { |
939 | | // one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block |
940 | 22 | return len + 6 + 5 * ((len >> 14) + 1); |
941 | 22 | } |
942 | | }; |
943 | | |
944 | | class Bzip2BlockCompression : public BlockCompressionCodec { |
945 | | public: |
946 | 4 | static Bzip2BlockCompression* instance() { |
947 | 4 | static Bzip2BlockCompression s_instance; |
948 | 4 | return &s_instance; |
949 | 4 | } |
950 | | ~Bzip2BlockCompression() override = default; |
951 | | |
952 | 69 | Status compress(const Slice& input, faststring* output) override { |
953 | 69 | size_t max_len = max_compressed_len(input.size); |
954 | 69 | output->resize(max_len); |
955 | 69 | auto size = cast_set<uint32_t>(output->size()); |
956 | 69 | auto bzres = BZ2_bzBuffToBuffCompress((char*)output->data(), &size, (char*)input.data, |
957 | 69 | cast_set<uint32_t>(input.size), 9, 0, 0); |
958 | 69 | if (bzres == BZ_MEM_ERROR) { |
959 | 0 | throw Exception( |
960 | 0 | Status::MemoryLimitExceeded("Fail to do Bzip2 compress, ret={}", bzres)); |
961 | 69 | } else if (bzres == BZ_PARAM_ERROR) { |
962 | 0 | return Status::InvalidArgument("Fail to do Bzip2 compress, ret={}", bzres); |
963 | 69 | } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK && |
964 | 69 | bzres != BZ_STREAM_END && bzres != BZ_OK) { |
965 | 0 | return Status::InternalError("Failed to init bz2. status code: {}", bzres); |
966 | 0 | } |
967 | 69 | output->resize(size); |
968 | 69 | return Status::OK(); |
969 | 69 | } |
970 | | |
971 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
972 | 0 | faststring* output) override { |
973 | 0 | size_t max_len = max_compressed_len(uncompressed_size); |
974 | 0 | output->resize(max_len); |
975 | |
|
976 | 0 | bz_stream bzstrm; |
977 | 0 | bzero(&bzstrm, sizeof(bzstrm)); |
978 | 0 | int bzres = BZ2_bzCompressInit(&bzstrm, 9, 0, 0); |
979 | 0 | if (bzres == BZ_PARAM_ERROR) { |
980 | 0 | return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres); |
981 | 0 | } else if (bzres == BZ_MEM_ERROR) { |
982 | 0 | throw Exception( |
983 | 0 | Status::MemoryLimitExceeded("Failed to init bz2. status code: {}", bzres)); |
984 | 0 | } else if (bzres != BZ_OK) { |
985 | 0 | return Status::InternalError("Failed to init bz2. status code: {}", bzres); |
986 | 0 | } |
987 | | // we assume that output is e |
988 | 0 | bzstrm.next_out = (char*)output->data(); |
989 | 0 | bzstrm.avail_out = cast_set<uint32_t>(output->size()); |
990 | 0 | for (int i = 0; i < inputs.size(); ++i) { |
991 | 0 | if (inputs[i].size == 0) { |
992 | 0 | continue; |
993 | 0 | } |
994 | 0 | bzstrm.next_in = (char*)inputs[i].data; |
995 | 0 | bzstrm.avail_in = cast_set<uint32_t>(inputs[i].size); |
996 | 0 | int flush = (i == (inputs.size() - 1)) ? BZ_FINISH : BZ_RUN; |
997 | |
|
998 | 0 | bzres = BZ2_bzCompress(&bzstrm, flush); |
999 | 0 | if (bzres == BZ_PARAM_ERROR) { |
1000 | 0 | return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres); |
1001 | 0 | } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK && |
1002 | 0 | bzres != BZ_STREAM_END && bzres != BZ_OK) { |
1003 | 0 | return Status::InternalError("Failed to init bz2. status code: {}", bzres); |
1004 | 0 | } |
1005 | 0 | } |
1006 | | |
1007 | 0 | size_t total_out = (size_t)bzstrm.total_out_hi32 << 32 | (size_t)bzstrm.total_out_lo32; |
1008 | 0 | output->resize(total_out); |
1009 | 0 | bzres = BZ2_bzCompressEnd(&bzstrm); |
1010 | 0 | if (bzres == BZ_PARAM_ERROR) { |
1011 | 0 | return Status::InvalidArgument("Fail to do deflateEnd on bzip2 stream, res={}", bzres); |
1012 | 0 | } else if (bzres != BZ_OK) { |
1013 | 0 | return Status::InternalError("Fail to do deflateEnd on bzip2 stream, res={}", bzres); |
1014 | 0 | } |
1015 | 0 | return Status::OK(); |
1016 | 0 | } |
1017 | | |
1018 | 0 | Status decompress(const Slice& input, Slice* output) override { |
1019 | 0 | return Status::InternalError("unimplement: Bzip2BlockCompression::decompress"); |
1020 | 0 | } |
1021 | | |
1022 | 69 | size_t max_compressed_len(size_t len) override { |
1023 | | // TODO: make sure the max_compressed_len for bzip2 |
1024 | | // 50 is an estimate fix overhead for bzip2 |
1025 | | // in case the input len is small and BZ2_bzBuffToBuffCompress will return |
1026 | | // BZ_OUTBUFF_FULL |
1027 | 69 | return len * 2 + 50; |
1028 | 69 | } |
1029 | | }; |
1030 | | |
1031 | | // for ZSTD compression and decompression, with BOTH fast and high compression ratio |
1032 | | class ZstdBlockCompression : public BlockCompressionCodec { |
1033 | | private: |
1034 | | class CContext { |
1035 | | ENABLE_FACTORY_CREATOR(CContext); |
1036 | | |
1037 | | public: |
1038 | 29 | CContext() : ctx(nullptr) { |
1039 | 29 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
1040 | 29 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
1041 | 29 | buffer = std::make_unique<faststring>(); |
1042 | 29 | } |
1043 | | ZSTD_CCtx* ctx; |
1044 | | std::unique_ptr<faststring> buffer; |
1045 | 7 | ~CContext() { |
1046 | 7 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
1047 | 7 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
1048 | 7 | if (ctx) { |
1049 | 7 | ZSTD_freeCCtx(ctx); |
1050 | 7 | } |
1051 | 7 | buffer.reset(); |
1052 | 7 | } |
1053 | | }; |
1054 | | class DContext { |
1055 | | ENABLE_FACTORY_CREATOR(DContext); |
1056 | | |
1057 | | public: |
1058 | 39 | DContext() : ctx(nullptr) {} |
1059 | | ZSTD_DCtx* ctx; |
1060 | 18 | ~DContext() { |
1061 | 18 | if (ctx) { |
1062 | 18 | ZSTD_freeDCtx(ctx); |
1063 | 18 | } |
1064 | 18 | } |
1065 | | }; |
1066 | | |
1067 | | public: |
1068 | 16.0M | static ZstdBlockCompression* instance() { |
1069 | 16.0M | static ZstdBlockCompression s_instance; |
1070 | 16.0M | return &s_instance; |
1071 | 16.0M | } |
1072 | 4 | ~ZstdBlockCompression() { |
1073 | 4 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
1074 | 4 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
1075 | 4 | _ctx_c_pool.clear(); |
1076 | 4 | _ctx_d_pool.clear(); |
1077 | 4 | } |
1078 | | |
1079 | 337k | size_t max_compressed_len(size_t len) override { return ZSTD_compressBound(len); } |
1080 | | |
1081 | 644 | Status compress(const Slice& input, faststring* output) override { |
1082 | 644 | std::vector<Slice> inputs {input}; |
1083 | 644 | return compress(inputs, input.size, output); |
1084 | 644 | } |
1085 | | |
1086 | | // follow ZSTD official example |
1087 | | // https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c |
1088 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
1089 | 337k | faststring* output) override { |
1090 | 337k | std::unique_ptr<CContext> context; |
1091 | 337k | RETURN_IF_ERROR(_acquire_compression_ctx(context)); |
1092 | 337k | bool compress_failed = false; |
1093 | 337k | Defer defer {[&] { |
1094 | 337k | if (!compress_failed) { |
1095 | 337k | _release_compression_ctx(std::move(context)); |
1096 | 337k | } |
1097 | 337k | }}; |
1098 | | |
1099 | 337k | try { |
1100 | 337k | size_t max_len = max_compressed_len(uncompressed_size); |
1101 | 337k | Slice compressed_buf; |
1102 | 337k | if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
1103 | | // use output directly |
1104 | 4 | output->resize(max_len); |
1105 | 4 | compressed_buf.data = reinterpret_cast<char*>(output->data()); |
1106 | 4 | compressed_buf.size = max_len; |
1107 | 337k | } else { |
1108 | 337k | { |
1109 | 337k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
1110 | 337k | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
1111 | | // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE |
1112 | 337k | context->buffer->resize(max_len); |
1113 | 337k | } |
1114 | 337k | compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); |
1115 | 337k | compressed_buf.size = max_len; |
1116 | 337k | } |
1117 | | |
1118 | | // set compression level to default 3 |
1119 | 337k | auto ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_compressionLevel, |
1120 | 337k | ZSTD_CLEVEL_DEFAULT); |
1121 | 337k | if (ZSTD_isError(ret)) { |
1122 | 0 | return Status::InvalidArgument("ZSTD_CCtx_setParameter compression level error: {}", |
1123 | 0 | ZSTD_getErrorString(ZSTD_getErrorCode(ret))); |
1124 | 0 | } |
1125 | | // set checksum flag to 1 |
1126 | 337k | ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1); |
1127 | 337k | if (ZSTD_isError(ret)) { |
1128 | 0 | return Status::InvalidArgument("ZSTD_CCtx_setParameter checksumFlag error: {}", |
1129 | 0 | ZSTD_getErrorString(ZSTD_getErrorCode(ret))); |
1130 | 0 | } |
1131 | | |
1132 | 337k | ZSTD_outBuffer out_buf = {compressed_buf.data, compressed_buf.size, 0}; |
1133 | | |
1134 | 743k | for (size_t i = 0; i < inputs.size(); i++) { |
1135 | 405k | ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0}; |
1136 | | |
1137 | 405k | bool last_input = (i == inputs.size() - 1); |
1138 | 405k | auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue; |
1139 | | |
1140 | 405k | bool finished = false; |
1141 | 405k | do { |
1142 | | // do compress |
1143 | 405k | ret = ZSTD_compressStream2(context->ctx, &out_buf, &in_buf, mode); |
1144 | | |
1145 | 405k | if (ZSTD_isError(ret)) { |
1146 | 0 | compress_failed = true; |
1147 | 0 | return Status::InternalError("ZSTD_compressStream2 error: {}", |
1148 | 0 | ZSTD_getErrorString(ZSTD_getErrorCode(ret))); |
1149 | 0 | } |
1150 | | |
1151 | | // ret is ZSTD hint for needed output buffer size |
1152 | 405k | if (ret > 0 && out_buf.pos == out_buf.size) { |
1153 | 0 | compress_failed = true; |
1154 | 0 | return Status::InternalError("ZSTD_compressStream2 output buffer full"); |
1155 | 0 | } |
1156 | | |
1157 | 405k | finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size); |
1158 | 405k | } while (!finished); |
1159 | 405k | } |
1160 | | |
1161 | | // set compressed size for caller |
1162 | 337k | output->resize(out_buf.pos); |
1163 | 337k | if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
1164 | 337k | output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), out_buf.pos); |
1165 | 337k | } |
1166 | 337k | } catch (std::exception& e) { |
1167 | 0 | return Status::InternalError("Fail to do ZSTD compress due to exception {}", e.what()); |
1168 | 0 | } catch (...) { |
1169 | | // Do not set compress_failed to release context |
1170 | 0 | DCHECK(!compress_failed); |
1171 | 0 | return Status::InternalError("Fail to do ZSTD compress due to exception"); |
1172 | 0 | } |
1173 | | |
1174 | 337k | return Status::OK(); |
1175 | 337k | } |
1176 | | |
1177 | 182k | Status decompress(const Slice& input, Slice* output) override { |
1178 | 182k | std::unique_ptr<DContext> context; |
1179 | 182k | bool decompress_failed = false; |
1180 | 182k | RETURN_IF_ERROR(_acquire_decompression_ctx(context)); |
1181 | 182k | Defer defer {[&] { |
1182 | 182k | if (!decompress_failed) { |
1183 | 182k | _release_decompression_ctx(std::move(context)); |
1184 | 182k | } |
1185 | 182k | }}; |
1186 | | |
1187 | 182k | size_t ret = ZSTD_decompressDCtx(context->ctx, output->data, output->size, input.data, |
1188 | 182k | input.size); |
1189 | 182k | if (ZSTD_isError(ret)) { |
1190 | 5 | decompress_failed = true; |
1191 | 5 | return Status::InternalError("ZSTD_decompressDCtx error: {}", |
1192 | 5 | ZSTD_getErrorString(ZSTD_getErrorCode(ret))); |
1193 | 5 | } |
1194 | | |
1195 | | // set decompressed size for caller |
1196 | 182k | output->size = ret; |
1197 | | |
1198 | 182k | return Status::OK(); |
1199 | 182k | } |
1200 | | |
1201 | | private: |
1202 | 337k | Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) { |
1203 | 337k | std::lock_guard<std::mutex> l(_ctx_c_mutex); |
1204 | 337k | if (_ctx_c_pool.empty()) { |
1205 | 29 | std::unique_ptr<CContext> localCtx = CContext::create_unique(); |
1206 | 29 | if (localCtx.get() == nullptr) { |
1207 | 0 | return Status::InvalidArgument("failed to new ZSTD CContext"); |
1208 | 0 | } |
1209 | | //typedef LZ4F_cctx* LZ4F_compressionContext_t; |
1210 | 29 | localCtx->ctx = ZSTD_createCCtx(); |
1211 | 29 | if (localCtx->ctx == nullptr) { |
1212 | 0 | return Status::InvalidArgument("Failed to create ZSTD compress ctx"); |
1213 | 0 | } |
1214 | 29 | out = std::move(localCtx); |
1215 | 29 | return Status::OK(); |
1216 | 29 | } |
1217 | 337k | out = std::move(_ctx_c_pool.back()); |
1218 | 337k | _ctx_c_pool.pop_back(); |
1219 | 337k | return Status::OK(); |
1220 | 337k | } |
1221 | 337k | void _release_compression_ctx(std::unique_ptr<CContext> context) { |
1222 | 337k | DCHECK(context); |
1223 | 337k | auto ret = ZSTD_CCtx_reset(context->ctx, ZSTD_reset_session_only); |
1224 | 337k | DCHECK(!ZSTD_isError(ret)); |
1225 | 337k | std::lock_guard<std::mutex> l(_ctx_c_mutex); |
1226 | 337k | _ctx_c_pool.push_back(std::move(context)); |
1227 | 337k | } |
1228 | | |
1229 | 182k | Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) { |
1230 | 182k | std::lock_guard<std::mutex> l(_ctx_d_mutex); |
1231 | 182k | if (_ctx_d_pool.empty()) { |
1232 | 39 | std::unique_ptr<DContext> localCtx = DContext::create_unique(); |
1233 | 39 | if (localCtx.get() == nullptr) { |
1234 | 0 | return Status::InvalidArgument("failed to new ZSTD DContext"); |
1235 | 0 | } |
1236 | 39 | localCtx->ctx = ZSTD_createDCtx(); |
1237 | 39 | if (localCtx->ctx == nullptr) { |
1238 | 0 | return Status::InvalidArgument("Fail to init ZSTD decompress context"); |
1239 | 0 | } |
1240 | 39 | out = std::move(localCtx); |
1241 | 39 | return Status::OK(); |
1242 | 39 | } |
1243 | 182k | out = std::move(_ctx_d_pool.back()); |
1244 | 182k | _ctx_d_pool.pop_back(); |
1245 | 182k | return Status::OK(); |
1246 | 182k | } |
1247 | 182k | void _release_decompression_ctx(std::unique_ptr<DContext> context) { |
1248 | 182k | DCHECK(context); |
1249 | | // reset ctx to start a new decompress session |
1250 | 182k | auto ret = ZSTD_DCtx_reset(context->ctx, ZSTD_reset_session_only); |
1251 | 182k | DCHECK(!ZSTD_isError(ret)); |
1252 | 182k | std::lock_guard<std::mutex> l(_ctx_d_mutex); |
1253 | 182k | _ctx_d_pool.push_back(std::move(context)); |
1254 | 182k | } |
1255 | | |
1256 | | private: |
1257 | | mutable std::mutex _ctx_c_mutex; |
1258 | | mutable std::vector<std::unique_ptr<CContext>> _ctx_c_pool; |
1259 | | |
1260 | | mutable std::mutex _ctx_d_mutex; |
1261 | | mutable std::vector<std::unique_ptr<DContext>> _ctx_d_pool; |
1262 | | }; |
1263 | | |
1264 | | class GzipBlockCompression : public ZlibBlockCompression { |
1265 | | public: |
1266 | 7 | static GzipBlockCompression* instance() { |
1267 | 7 | static GzipBlockCompression s_instance; |
1268 | 7 | return &s_instance; |
1269 | 7 | } |
1270 | | ~GzipBlockCompression() override = default; |
1271 | | |
1272 | 95 | Status compress(const Slice& input, faststring* output) override { |
1273 | 95 | size_t max_len = max_compressed_len(input.size); |
1274 | 95 | output->resize(max_len); |
1275 | | |
1276 | 95 | z_stream z_strm = {}; |
1277 | 95 | z_strm.zalloc = Z_NULL; |
1278 | 95 | z_strm.zfree = Z_NULL; |
1279 | 95 | z_strm.opaque = Z_NULL; |
1280 | | |
1281 | 95 | int zres = deflateInit2(&z_strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, |
1282 | 95 | 8, Z_DEFAULT_STRATEGY); |
1283 | | |
1284 | 95 | if (zres == Z_MEM_ERROR) { |
1285 | 0 | throw Exception(Status::MemoryLimitExceeded( |
1286 | 0 | "Fail to init ZLib compress, error={}, res={}", zError(zres), zres)); |
1287 | 95 | } else if (zres != Z_OK) { |
1288 | 0 | return Status::InternalError("Fail to init ZLib compress, error={}, res={}", |
1289 | 0 | zError(zres), zres); |
1290 | 0 | } |
1291 | | |
1292 | 95 | z_strm.next_in = (Bytef*)input.get_data(); |
1293 | 95 | z_strm.avail_in = cast_set<decltype(z_strm.avail_in)>(input.get_size()); |
1294 | 95 | z_strm.next_out = (Bytef*)output->data(); |
1295 | 95 | z_strm.avail_out = cast_set<decltype(z_strm.avail_out)>(output->size()); |
1296 | | |
1297 | 95 | zres = deflate(&z_strm, Z_FINISH); |
1298 | 95 | if (zres != Z_OK && zres != Z_STREAM_END) { |
1299 | 0 | return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
1300 | 0 | zError(zres), zres); |
1301 | 0 | } |
1302 | | |
1303 | 95 | output->resize(z_strm.total_out); |
1304 | 95 | zres = deflateEnd(&z_strm); |
1305 | 95 | if (zres == Z_DATA_ERROR) { |
1306 | 0 | return Status::InvalidArgument("Fail to end zlib compress"); |
1307 | 95 | } else if (zres != Z_OK) { |
1308 | 0 | return Status::InternalError("Fail to end zlib compress"); |
1309 | 0 | } |
1310 | 95 | return Status::OK(); |
1311 | 95 | } |
1312 | | |
1313 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
1314 | 0 | faststring* output) override { |
1315 | 0 | size_t max_len = max_compressed_len(uncompressed_size); |
1316 | 0 | output->resize(max_len); |
1317 | |
|
1318 | 0 | z_stream zstrm; |
1319 | 0 | zstrm.zalloc = Z_NULL; |
1320 | 0 | zstrm.zfree = Z_NULL; |
1321 | 0 | zstrm.opaque = Z_NULL; |
1322 | 0 | auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, |
1323 | 0 | 8, Z_DEFAULT_STRATEGY); |
1324 | 0 | if (zres == Z_MEM_ERROR) { |
1325 | 0 | throw Exception(Status::MemoryLimitExceeded( |
1326 | 0 | "Fail to init ZLib stream compress, error={}, res={}", zError(zres), zres)); |
1327 | 0 | } else if (zres != Z_OK) { |
1328 | 0 | return Status::InternalError("Fail to init ZLib stream compress, error={}, res={}", |
1329 | 0 | zError(zres), zres); |
1330 | 0 | } |
1331 | | |
1332 | | // we assume that output is e |
1333 | 0 | zstrm.next_out = (Bytef*)output->data(); |
1334 | 0 | zstrm.avail_out = cast_set<decltype(zstrm.avail_out)>(output->size()); |
1335 | 0 | for (int i = 0; i < inputs.size(); ++i) { |
1336 | 0 | if (inputs[i].size == 0) { |
1337 | 0 | continue; |
1338 | 0 | } |
1339 | 0 | zstrm.next_in = (Bytef*)inputs[i].data; |
1340 | 0 | zstrm.avail_in = cast_set<decltype(zstrm.avail_in)>(inputs[i].size); |
1341 | 0 | int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH; |
1342 | |
|
1343 | 0 | zres = deflate(&zstrm, flush); |
1344 | 0 | if (zres != Z_OK && zres != Z_STREAM_END) { |
1345 | 0 | return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
1346 | 0 | zError(zres), zres); |
1347 | 0 | } |
1348 | 0 | } |
1349 | | |
1350 | 0 | output->resize(zstrm.total_out); |
1351 | 0 | zres = deflateEnd(&zstrm); |
1352 | 0 | if (zres == Z_DATA_ERROR) { |
1353 | 0 | return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}", |
1354 | 0 | zError(zres), zres); |
1355 | 0 | } else if (zres != Z_OK) { |
1356 | 0 | return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}", |
1357 | 0 | zError(zres), zres); |
1358 | 0 | } |
1359 | 0 | return Status::OK(); |
1360 | 0 | } |
1361 | | |
1362 | 0 | Status decompress(const Slice& input, Slice* output) override { |
1363 | 0 | z_stream z_strm = {}; |
1364 | 0 | z_strm.zalloc = Z_NULL; |
1365 | 0 | z_strm.zfree = Z_NULL; |
1366 | 0 | z_strm.opaque = Z_NULL; |
1367 | |
|
1368 | 0 | int ret = inflateInit2(&z_strm, MAX_WBITS + GZIP_CODEC); |
1369 | 0 | if (ret != Z_OK) { |
1370 | 0 | return Status::InternalError("Fail to init ZLib decompress, error={}, res={}", |
1371 | 0 | zError(ret), ret); |
1372 | 0 | } |
1373 | | |
1374 | | // 1. set input and output |
1375 | 0 | z_strm.next_in = reinterpret_cast<Bytef*>(input.data); |
1376 | 0 | z_strm.avail_in = cast_set<decltype(z_strm.avail_in)>(input.size); |
1377 | 0 | z_strm.next_out = reinterpret_cast<Bytef*>(output->data); |
1378 | 0 | z_strm.avail_out = cast_set<decltype(z_strm.avail_out)>(output->size); |
1379 | |
|
1380 | 0 | if (z_strm.avail_out > 0) { |
1381 | | // We only support non-streaming use case for block decompressor |
1382 | 0 | ret = inflate(&z_strm, Z_FINISH); |
1383 | 0 | if (ret != Z_OK && ret != Z_STREAM_END) { |
1384 | 0 | (void)inflateEnd(&z_strm); |
1385 | 0 | if (ret == Z_MEM_ERROR) { |
1386 | 0 | throw Exception(Status::MemoryLimitExceeded( |
1387 | 0 | "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret)); |
1388 | 0 | } else if (ret == Z_DATA_ERROR) { |
1389 | 0 | return Status::InvalidArgument( |
1390 | 0 | "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret); |
1391 | 0 | } |
1392 | 0 | return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
1393 | 0 | zError(ret), ret); |
1394 | 0 | } |
1395 | 0 | } |
1396 | 0 | (void)inflateEnd(&z_strm); |
1397 | |
|
1398 | 0 | return Status::OK(); |
1399 | 0 | } |
1400 | | |
1401 | 95 | size_t max_compressed_len(size_t len) override { |
1402 | 95 | z_stream zstrm; |
1403 | 95 | zstrm.zalloc = Z_NULL; |
1404 | 95 | zstrm.zfree = Z_NULL; |
1405 | 95 | zstrm.opaque = Z_NULL; |
1406 | 95 | auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, |
1407 | 95 | MEM_LEVEL, Z_DEFAULT_STRATEGY); |
1408 | 95 | if (zres != Z_OK) { |
1409 | | // Fall back to zlib estimate logic for deflate, notice this may |
1410 | | // cause decompress error |
1411 | 0 | LOG(WARNING) << "Fail to do ZLib stream compress, error=" << zError(zres) |
1412 | 0 | << ", res=" << zres; |
1413 | 0 | return ZlibBlockCompression::max_compressed_len(len); |
1414 | 95 | } else { |
1415 | 95 | zres = deflateEnd(&zstrm); |
1416 | 95 | if (zres != Z_OK) { |
1417 | 0 | LOG(WARNING) << "Fail to do deflateEnd on ZLib stream, error=" << zError(zres) |
1418 | 0 | << ", res=" << zres; |
1419 | 0 | } |
1420 | | // Mark, maintainer of zlib, has stated that 12 needs to be added to |
1421 | | // result for gzip |
1422 | | // http://compgroups.net/comp.unix.programmer/gzip-compressing-an-in-memory-string-usi/54854 |
1423 | | // To have a safe upper bound for "wrapper variations", we add 32 to |
1424 | | // estimate |
1425 | 95 | auto upper_bound = deflateBound(&zstrm, len) + 32; |
1426 | 95 | return upper_bound; |
1427 | 95 | } |
1428 | 95 | } |
1429 | | |
1430 | | private: |
1431 | | // Magic number for zlib, see https://zlib.net/manual.html for more details. |
1432 | | const static int GZIP_CODEC = 16; // gzip |
1433 | | // The memLevel parameter specifies how much memory should be allocated for |
1434 | | // the internal compression state. |
1435 | | const static int MEM_LEVEL = 8; |
1436 | | }; |
1437 | | |
1438 | | // Only used on x86 or x86_64 |
1439 | | #if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \ |
1440 | | defined(__i386) || defined(_M_IX86) |
1441 | | class GzipBlockCompressionByLibdeflate final : public GzipBlockCompression { |
1442 | | public: |
1443 | 1 | GzipBlockCompressionByLibdeflate() : GzipBlockCompression() {} |
1444 | 4 | static GzipBlockCompressionByLibdeflate* instance() { |
1445 | 4 | static GzipBlockCompressionByLibdeflate s_instance; |
1446 | 4 | return &s_instance; |
1447 | 4 | } |
1448 | | ~GzipBlockCompressionByLibdeflate() override = default; |
1449 | | |
1450 | 8 | Status decompress(const Slice& input, Slice* output) override { |
1451 | 8 | if (input.empty()) { |
1452 | 0 | output->size = 0; |
1453 | 0 | return Status::OK(); |
1454 | 0 | } |
1455 | 8 | thread_local std::unique_ptr<libdeflate_decompressor, void (*)(libdeflate_decompressor*)> |
1456 | 8 | decompressor {libdeflate_alloc_decompressor(), libdeflate_free_decompressor}; |
1457 | 8 | if (!decompressor) { |
1458 | 0 | return Status::InternalError("libdeflate_alloc_decompressor error."); |
1459 | 0 | } |
1460 | 8 | std::size_t out_len; |
1461 | 8 | auto result = libdeflate_gzip_decompress(decompressor.get(), input.data, input.size, |
1462 | 8 | output->data, output->size, &out_len); |
1463 | 8 | if (result != LIBDEFLATE_SUCCESS) { |
1464 | 0 | return Status::InternalError("libdeflate_gzip_decompress error, res={}", result); |
1465 | 0 | } |
1466 | 8 | return Status::OK(); |
1467 | 8 | } |
1468 | | }; |
1469 | | #endif |
1470 | | |
1471 | | class LzoBlockCompression final : public BlockCompressionCodec { |
1472 | | public: |
1473 | 0 | static LzoBlockCompression* instance() { |
1474 | 0 | static LzoBlockCompression s_instance; |
1475 | 0 | return &s_instance; |
1476 | 0 | } |
1477 | | |
1478 | 0 | Status compress(const Slice& input, faststring* output) override { |
1479 | 0 | return Status::InvalidArgument("not impl lzo compress."); |
1480 | 0 | } |
1481 | 0 | size_t max_compressed_len(size_t len) override { return 0; }; |
1482 | 0 | Status decompress(const Slice& input, Slice* output) override { |
1483 | 0 | auto* input_ptr = input.data; |
1484 | 0 | auto remain_input_size = input.size; |
1485 | 0 | auto* output_ptr = output->data; |
1486 | 0 | auto remain_output_size = output->size; |
1487 | 0 | auto* output_limit = output->data + output->size; |
1488 | | |
1489 | | // Example: |
1490 | | // OriginData(The original data will be divided into several large data block.) : |
1491 | | // large data block1 | large data block2 | large data block3 | .... |
1492 | | // The large data block will be divided into several small data block. |
1493 | | // Suppose a large data block is divided into three small blocks: |
1494 | | // large data block1: | small block1 | small block2 | small block3 | |
1495 | | // CompressData: <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]> |
1496 | | // |
1497 | | // A : original length of the current block of large data block. |
1498 | | // sizeof(A) = 4 bytes. |
1499 | | // A = length(small block1) + length(small block2) + length(small block3) |
1500 | | // Bx : length of small data block bx. |
1501 | | // sizeof(Bx) = 4 bytes. |
1502 | | // Bx = length(compress(small blockx)) |
1503 | 0 | try { |
1504 | 0 | while (remain_input_size > 0) { |
1505 | 0 | if (remain_input_size < 4) { |
1506 | 0 | return Status::InvalidArgument( |
1507 | 0 | "Need more input buffer to get large_block_uncompressed_len."); |
1508 | 0 | } |
1509 | | |
1510 | 0 | uint32_t large_block_uncompressed_len = BigEndian::Load32(input_ptr); |
1511 | 0 | input_ptr += 4; |
1512 | 0 | remain_input_size -= 4; |
1513 | |
|
1514 | 0 | if (remain_output_size < large_block_uncompressed_len) { |
1515 | 0 | return Status::InvalidArgument( |
1516 | 0 | "Need more output buffer to get uncompressed data."); |
1517 | 0 | } |
1518 | | |
1519 | 0 | while (large_block_uncompressed_len > 0) { |
1520 | 0 | if (remain_input_size < 4) { |
1521 | 0 | return Status::InvalidArgument( |
1522 | 0 | "Need more input buffer to get small_block_compressed_len."); |
1523 | 0 | } |
1524 | | |
1525 | 0 | uint32_t small_block_compressed_len = BigEndian::Load32(input_ptr); |
1526 | 0 | input_ptr += 4; |
1527 | 0 | remain_input_size -= 4; |
1528 | |
|
1529 | 0 | if (remain_input_size < small_block_compressed_len) { |
1530 | 0 | return Status::InvalidArgument( |
1531 | 0 | "Need more input buffer to decompress small block."); |
1532 | 0 | } |
1533 | | |
1534 | 0 | auto small_block_uncompressed_len = |
1535 | 0 | orc::lzoDecompress(input_ptr, input_ptr + small_block_compressed_len, |
1536 | 0 | output_ptr, output_limit); |
1537 | |
|
1538 | 0 | input_ptr += small_block_compressed_len; |
1539 | 0 | remain_input_size -= small_block_compressed_len; |
1540 | |
|
1541 | 0 | output_ptr += small_block_uncompressed_len; |
1542 | 0 | large_block_uncompressed_len -= small_block_uncompressed_len; |
1543 | 0 | remain_output_size -= small_block_uncompressed_len; |
1544 | 0 | } |
1545 | 0 | } |
1546 | 0 | } catch (const orc::ParseError& e) { |
1547 | | //Prevent be from hanging due to orc::lzoDecompress throw exception |
1548 | 0 | return Status::InternalError("Fail to do LZO decompress, error={}", e.what()); |
1549 | 0 | } |
1550 | 0 | return Status::OK(); |
1551 | 0 | } |
1552 | | }; |
1553 | | |
1554 | | class BrotliBlockCompression final : public BlockCompressionCodec { |
1555 | | public: |
1556 | 0 | static BrotliBlockCompression* instance() { |
1557 | 0 | static BrotliBlockCompression s_instance; |
1558 | 0 | return &s_instance; |
1559 | 0 | } |
1560 | | |
1561 | 0 | Status compress(const Slice& input, faststring* output) override { |
1562 | 0 | return Status::InvalidArgument("not impl brotli compress."); |
1563 | 0 | } |
1564 | | |
1565 | 0 | size_t max_compressed_len(size_t len) override { return 0; }; |
1566 | | |
1567 | 0 | Status decompress(const Slice& input, Slice* output) override { |
1568 | | // The size of output buffer is always equal to the umcompressed length. |
1569 | 0 | BrotliDecoderResult result = BrotliDecoderDecompress( |
1570 | 0 | input.get_size(), reinterpret_cast<const uint8_t*>(input.get_data()), &output->size, |
1571 | 0 | reinterpret_cast<uint8_t*>(output->data)); |
1572 | 0 | if (result != BROTLI_DECODER_RESULT_SUCCESS) { |
1573 | 0 | return Status::InternalError("Brotli decompression failed, result={}", result); |
1574 | 0 | } |
1575 | 0 | return Status::OK(); |
1576 | 0 | } |
1577 | | }; |
1578 | | |
1579 | | Status get_block_compression_codec(segment_v2::CompressionTypePB type, |
1580 | 24.6M | BlockCompressionCodec** codec) { |
1581 | 24.6M | switch (type) { |
1582 | 8.46M | case segment_v2::CompressionTypePB::NO_COMPRESSION: |
1583 | 8.46M | *codec = nullptr; |
1584 | 8.46M | break; |
1585 | 64.9k | case segment_v2::CompressionTypePB::SNAPPY: |
1586 | 64.9k | *codec = SnappyBlockCompression::instance(); |
1587 | 64.9k | break; |
1588 | 70.7k | case segment_v2::CompressionTypePB::LZ4: |
1589 | 70.7k | *codec = Lz4BlockCompression::instance(); |
1590 | 70.7k | break; |
1591 | 31.1k | case segment_v2::CompressionTypePB::LZ4F: |
1592 | 31.1k | *codec = Lz4fBlockCompression::instance(); |
1593 | 31.1k | break; |
1594 | 2 | case segment_v2::CompressionTypePB::LZ4HC: |
1595 | 2 | *codec = Lz4HCBlockCompression::instance(); |
1596 | 2 | break; |
1597 | 47 | case segment_v2::CompressionTypePB::ZLIB: |
1598 | 47 | *codec = ZlibBlockCompression::instance(); |
1599 | 47 | break; |
1600 | 16.0M | case segment_v2::CompressionTypePB::ZSTD: |
1601 | 16.0M | *codec = ZstdBlockCompression::instance(); |
1602 | 16.0M | break; |
1603 | 0 | default: |
1604 | 0 | return Status::InternalError("unknown compression type({})", type); |
1605 | 24.6M | } |
1606 | | |
1607 | 24.6M | return Status::OK(); |
1608 | 24.6M | } |
1609 | | |
1610 | | // this can only be used in hive text write |
1611 | 172 | Status get_block_compression_codec(TFileCompressType::type type, BlockCompressionCodec** codec) { |
1612 | 172 | switch (type) { |
1613 | 149 | case TFileCompressType::PLAIN: |
1614 | 149 | *codec = nullptr; |
1615 | 149 | break; |
1616 | 0 | case TFileCompressType::ZLIB: |
1617 | 0 | *codec = ZlibBlockCompression::instance(); |
1618 | 0 | break; |
1619 | 7 | case TFileCompressType::GZ: |
1620 | 7 | *codec = GzipBlockCompression::instance(); |
1621 | 7 | break; |
1622 | 4 | case TFileCompressType::BZ2: |
1623 | 4 | *codec = Bzip2BlockCompression::instance(); |
1624 | 4 | break; |
1625 | 4 | case TFileCompressType::LZ4BLOCK: |
1626 | 4 | *codec = HadoopLz4BlockCompression::instance(); |
1627 | 4 | break; |
1628 | 4 | case TFileCompressType::SNAPPYBLOCK: |
1629 | 4 | *codec = HadoopSnappyBlockCompression::instance(); |
1630 | 4 | break; |
1631 | 4 | case TFileCompressType::ZSTD: |
1632 | 4 | *codec = ZstdBlockCompression::instance(); |
1633 | 4 | break; |
1634 | 0 | default: |
1635 | 0 | return Status::InternalError("unsupport compression type({}) int hive text", type); |
1636 | 172 | } |
1637 | | |
1638 | 172 | return Status::OK(); |
1639 | 172 | } |
1640 | | |
1641 | | Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_codec, |
1642 | 1.23k | BlockCompressionCodec** codec) { |
1643 | 1.23k | switch (parquet_codec) { |
1644 | 118 | case tparquet::CompressionCodec::UNCOMPRESSED: |
1645 | 118 | *codec = nullptr; |
1646 | 118 | break; |
1647 | 1.06k | case tparquet::CompressionCodec::SNAPPY: |
1648 | 1.06k | *codec = SnappyBlockCompression::instance(); |
1649 | 1.06k | break; |
1650 | 4 | case tparquet::CompressionCodec::LZ4_RAW: // we can use LZ4 compression algorithm parse LZ4_RAW |
1651 | 4 | case tparquet::CompressionCodec::LZ4: |
1652 | 4 | *codec = HadoopLz4BlockCompression::instance(); |
1653 | 4 | break; |
1654 | 38 | case tparquet::CompressionCodec::ZSTD: |
1655 | 38 | *codec = ZstdBlockCompression::instance(); |
1656 | 38 | break; |
1657 | 4 | case tparquet::CompressionCodec::GZIP: |
1658 | | // Only used on x86 or x86_64 |
1659 | 4 | #if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \ |
1660 | 4 | defined(__i386) || defined(_M_IX86) |
1661 | 4 | *codec = GzipBlockCompressionByLibdeflate::instance(); |
1662 | | #else |
1663 | | *codec = GzipBlockCompression::instance(); |
1664 | | #endif |
1665 | 4 | break; |
1666 | 0 | case tparquet::CompressionCodec::LZO: |
1667 | 0 | *codec = LzoBlockCompression::instance(); |
1668 | 0 | break; |
1669 | 0 | case tparquet::CompressionCodec::BROTLI: |
1670 | 0 | *codec = BrotliBlockCompression::instance(); |
1671 | 0 | break; |
1672 | 0 | default: |
1673 | 0 | return Status::InternalError("unknown compression type({})", parquet_codec); |
1674 | 1.23k | } |
1675 | | |
1676 | 1.23k | return Status::OK(); |
1677 | 1.23k | } |
1678 | | |
1679 | | } // namespace doris |