/root/doris/be/src/util/block_compression.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "util/block_compression.h" |
19 | | |
20 | | #include <bzlib.h> |
21 | | #include <gen_cpp/parquet_types.h> |
22 | | #include <gen_cpp/segment_v2.pb.h> |
23 | | #include <glog/logging.h> |
24 | | |
25 | | #include <exception> |
26 | | // Only used on x86 or x86_64 |
27 | | #if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \ |
28 | | defined(__i386) || defined(_M_IX86) |
29 | | #include <libdeflate.h> |
30 | | #endif |
31 | | #include <brotli/decode.h> |
32 | | #include <glog/log_severity.h> |
33 | | #include <glog/logging.h> |
34 | | #include <lz4/lz4.h> |
35 | | #include <lz4/lz4frame.h> |
36 | | #include <lz4/lz4hc.h> |
37 | | #include <snappy/snappy-sinksource.h> |
38 | | #include <snappy/snappy.h> |
39 | | #include <zconf.h> |
40 | | #include <zlib.h> |
41 | | #include <zstd.h> |
42 | | #include <zstd_errors.h> |
43 | | |
44 | | #include <algorithm> |
45 | | #include <cstdint> |
46 | | #include <limits> |
47 | | #include <mutex> |
48 | | #include <orc/Exceptions.hh> |
49 | | #include <ostream> |
50 | | |
51 | | #include "absl/strings/substitute.h" |
52 | | #include "common/config.h" |
53 | | #include "common/factory_creator.h" |
54 | | #include "exec/decompressor.h" |
55 | | #include "runtime/thread_context.h" |
56 | | #include "util/defer_op.h" |
57 | | #include "util/faststring.h" |
58 | | #include "vec/common/endian.h" |
59 | | |
60 | | namespace orc { |
61 | | /** |
62 | | * Decompress the bytes in to the output buffer. |
63 | | * @param inputAddress the start of the input |
64 | | * @param inputLimit one past the last byte of the input |
65 | | * @param outputAddress the start of the output buffer |
66 | | * @param outputLimit one past the last byte of the output buffer |
67 | | * @result the number of bytes decompressed |
68 | | */ |
69 | | uint64_t lzoDecompress(const char* inputAddress, const char* inputLimit, char* outputAddress, |
70 | | char* outputLimit); |
71 | | } // namespace orc |
72 | | |
73 | | namespace doris { |
74 | | #include "common/compile_check_begin.h" |
75 | | |
76 | | // exception safe |
77 | | Status BlockCompressionCodec::compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
78 | 142 | faststring* output) { |
79 | 142 | faststring buf; |
80 | | // we compute total size to avoid more memory copy |
81 | 142 | buf.reserve(uncompressed_size); |
82 | 172 | for (auto& input : inputs) { |
83 | 172 | buf.append(input.data, input.size); |
84 | 172 | } |
85 | 142 | return compress(buf, output); |
86 | 142 | } |
87 | | |
88 | 22.9k | bool BlockCompressionCodec::exceed_max_compress_len(size_t uncompressed_size) { |
89 | 22.9k | return uncompressed_size > std::numeric_limits<int32_t>::max(); |
90 | 22.9k | } |
91 | | |
92 | | class Lz4BlockCompression : public BlockCompressionCodec { |
93 | | private: |
94 | | class Context { |
95 | | ENABLE_FACTORY_CREATOR(Context); |
96 | | |
97 | | public: |
98 | 1 | Context() : ctx(nullptr) { |
99 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
100 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
101 | 1 | buffer = std::make_unique<faststring>(); |
102 | 1 | } |
103 | | LZ4_stream_t* ctx; |
104 | | std::unique_ptr<faststring> buffer; |
105 | 1 | ~Context() { |
106 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
107 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
108 | 1 | if (ctx) { |
109 | 1 | LZ4_freeStream(ctx); |
110 | 1 | } |
111 | 1 | buffer.reset(); |
112 | 1 | } |
113 | | }; |
114 | | |
115 | | public: |
116 | 368 | static Lz4BlockCompression* instance() { |
117 | 368 | static Lz4BlockCompression s_instance; |
118 | 368 | return &s_instance; |
119 | 368 | } |
120 | 1 | ~Lz4BlockCompression() override { |
121 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
122 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
123 | 1 | _ctx_pool.clear(); |
124 | 1 | } |
125 | | |
126 | 190 | Status compress(const Slice& input, faststring* output) override { |
127 | 190 | if (input.size > LZ4_MAX_INPUT_SIZE) { |
128 | 0 | return Status::InvalidArgument( |
129 | 0 | "LZ4 not support those case(input.size>LZ4_MAX_INPUT_SIZE), maybe you should " |
130 | 0 | "change " |
131 | 0 | "fragment_transmission_compression_codec to snappy, input.size={}, " |
132 | 0 | "LZ4_MAX_INPUT_SIZE={}", |
133 | 0 | input.size, LZ4_MAX_INPUT_SIZE); |
134 | 0 | } |
135 | | |
136 | 190 | std::unique_ptr<Context> context; |
137 | 190 | RETURN_IF_ERROR(_acquire_compression_ctx(context)); |
138 | 190 | bool compress_failed = false; |
139 | 190 | Defer defer {[&] { |
140 | 190 | if (!compress_failed) { |
141 | 190 | _release_compression_ctx(std::move(context)); |
142 | 190 | } |
143 | 190 | }}; |
144 | | |
145 | 190 | try { |
146 | 190 | Slice compressed_buf; |
147 | 190 | size_t max_len = max_compressed_len(input.size); |
148 | 190 | if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
149 | | // use output directly |
150 | 0 | output->resize(max_len); |
151 | 0 | compressed_buf.data = reinterpret_cast<char*>(output->data()); |
152 | 0 | compressed_buf.size = max_len; |
153 | 190 | } else { |
154 | | // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE |
155 | 190 | { |
156 | | // context->buffer is resuable between queries, should accouting to |
157 | | // global tracker. |
158 | 190 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
159 | 190 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
160 | 190 | context->buffer->resize(max_len); |
161 | 190 | } |
162 | 190 | compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); |
163 | 190 | compressed_buf.size = max_len; |
164 | 190 | } |
165 | | |
166 | | // input.size is aready checked before; |
167 | | // compressed_buf.size is got from max_compressed_len, which is |
168 | | // the return value of LZ4_compressBound, so it is safe to cast to int |
169 | 190 | size_t compressed_len = LZ4_compress_fast_continue( |
170 | 190 | context->ctx, input.data, compressed_buf.data, static_cast<int>(input.size), |
171 | 190 | static_cast<int>(compressed_buf.size), ACCELARATION); |
172 | 190 | if (compressed_len == 0) { |
173 | 0 | compress_failed = true; |
174 | 0 | return Status::InvalidArgument("Output buffer's capacity is not enough, size={}", |
175 | 0 | compressed_buf.size); |
176 | 0 | } |
177 | 190 | output->resize(compressed_len); |
178 | 190 | if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
179 | 190 | output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), |
180 | 190 | compressed_len); |
181 | 190 | } |
182 | 190 | } catch (...) { |
183 | | // Do not set compress_failed to release context |
184 | 0 | DCHECK(!compress_failed); |
185 | 0 | return Status::InternalError("Fail to do LZ4Block compress due to exception"); |
186 | 0 | } |
187 | 190 | return Status::OK(); |
188 | 190 | } |
189 | | |
190 | 65 | Status decompress(const Slice& input, Slice* output) override { |
191 | 65 | auto decompressed_len = LZ4_decompress_safe( |
192 | 65 | input.data, output->data, cast_set<int>(input.size), cast_set<int>(output->size)); |
193 | 65 | if (decompressed_len < 0) { |
194 | 5 | return Status::InternalError("fail to do LZ4 decompress, error={}", decompressed_len); |
195 | 5 | } |
196 | 60 | output->size = decompressed_len; |
197 | 60 | return Status::OK(); |
198 | 65 | } |
199 | | |
200 | 190 | size_t max_compressed_len(size_t len) override { return LZ4_compressBound(cast_set<int>(len)); } |
201 | | |
202 | | private: |
203 | | // reuse LZ4 compress stream |
204 | 190 | Status _acquire_compression_ctx(std::unique_ptr<Context>& out) { |
205 | 190 | std::lock_guard<std::mutex> l(_ctx_mutex); |
206 | 190 | if (_ctx_pool.empty()) { |
207 | 1 | std::unique_ptr<Context> localCtx = Context::create_unique(); |
208 | 1 | if (localCtx.get() == nullptr) { |
209 | 0 | return Status::InvalidArgument("new LZ4 context error"); |
210 | 0 | } |
211 | 1 | localCtx->ctx = LZ4_createStream(); |
212 | 1 | if (localCtx->ctx == nullptr) { |
213 | 0 | return Status::InvalidArgument("LZ4_createStream error"); |
214 | 0 | } |
215 | 1 | out = std::move(localCtx); |
216 | 1 | return Status::OK(); |
217 | 1 | } |
218 | 189 | out = std::move(_ctx_pool.back()); |
219 | 189 | _ctx_pool.pop_back(); |
220 | 189 | return Status::OK(); |
221 | 190 | } |
222 | 190 | void _release_compression_ctx(std::unique_ptr<Context> context) { |
223 | 190 | DCHECK(context); |
224 | 190 | LZ4_resetStream(context->ctx); |
225 | 190 | std::lock_guard<std::mutex> l(_ctx_mutex); |
226 | 190 | _ctx_pool.push_back(std::move(context)); |
227 | 190 | } |
228 | | |
229 | | private: |
230 | | mutable std::mutex _ctx_mutex; |
231 | | mutable std::vector<std::unique_ptr<Context>> _ctx_pool; |
232 | | static const int32_t ACCELARATION = 1; |
233 | | }; |
234 | | |
235 | | class HadoopLz4BlockCompression : public Lz4BlockCompression { |
236 | | public: |
237 | 0 | HadoopLz4BlockCompression() { |
238 | 0 | Status st = Decompressor::create_decompressor(CompressType::LZ4BLOCK, &_decompressor); |
239 | 0 | if (!st.ok()) { |
240 | 0 | throw Exception(Status::FatalError( |
241 | 0 | "HadoopLz4BlockCompression construction failed. status = {}", st)); |
242 | 0 | } |
243 | 0 | } |
244 | | |
245 | 0 | ~HadoopLz4BlockCompression() override = default; |
246 | | |
247 | 0 | static HadoopLz4BlockCompression* instance() { |
248 | 0 | static HadoopLz4BlockCompression s_instance; |
249 | 0 | return &s_instance; |
250 | 0 | } |
251 | | |
252 | | // hadoop use block compression for lz4 |
253 | | // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc |
254 | 0 | Status compress(const Slice& input, faststring* output) override { |
255 | | // be same with hadop https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java |
256 | 0 | size_t lz4_block_size = config::lz4_compression_block_size; |
257 | 0 | size_t overhead = lz4_block_size / 255 + 16; |
258 | 0 | size_t max_input_size = lz4_block_size - overhead; |
259 | |
|
260 | 0 | size_t data_len = input.size; |
261 | 0 | char* data = input.data; |
262 | 0 | std::vector<OwnedSlice> buffers; |
263 | 0 | size_t out_len = 0; |
264 | |
|
265 | 0 | while (data_len > 0) { |
266 | 0 | size_t input_size = std::min(data_len, max_input_size); |
267 | 0 | Slice input_slice(data, input_size); |
268 | 0 | faststring output_data; |
269 | 0 | RETURN_IF_ERROR(Lz4BlockCompression::compress(input_slice, &output_data)); |
270 | 0 | out_len += output_data.size(); |
271 | 0 | buffers.push_back(output_data.build()); |
272 | 0 | data += input_size; |
273 | 0 | data_len -= input_size; |
274 | 0 | } |
275 | | |
276 | | // hadoop block compression: umcompressed_length | compressed_length1 | compressed_data1 | compressed_length2 | compressed_data2 | ... |
277 | 0 | size_t total_output_len = 4 + 4 * buffers.size() + out_len; |
278 | 0 | output->resize(total_output_len); |
279 | 0 | char* output_buffer = (char*)output->data(); |
280 | 0 | BigEndian::Store32(output_buffer, cast_set<uint32_t>(input.get_size())); |
281 | 0 | output_buffer += 4; |
282 | 0 | for (const auto& buffer : buffers) { |
283 | 0 | auto slice = buffer.slice(); |
284 | 0 | BigEndian::Store32(output_buffer, cast_set<uint32_t>(slice.get_size())); |
285 | 0 | output_buffer += 4; |
286 | 0 | memcpy(output_buffer, slice.get_data(), slice.get_size()); |
287 | 0 | output_buffer += slice.get_size(); |
288 | 0 | } |
289 | |
|
290 | 0 | DCHECK_EQ(output_buffer - (char*)output->data(), total_output_len); |
291 | |
|
292 | 0 | return Status::OK(); |
293 | 0 | } |
294 | | |
295 | 0 | Status decompress(const Slice& input, Slice* output) override { |
296 | 0 | size_t input_bytes_read = 0; |
297 | 0 | size_t decompressed_len = 0; |
298 | 0 | size_t more_input_bytes = 0; |
299 | 0 | size_t more_output_bytes = 0; |
300 | 0 | bool stream_end = false; |
301 | 0 | auto st = _decompressor->decompress((uint8_t*)input.data, cast_set<uint32_t>(input.size), |
302 | 0 | &input_bytes_read, (uint8_t*)output->data, |
303 | 0 | cast_set<uint32_t>(output->size), &decompressed_len, |
304 | 0 | &stream_end, &more_input_bytes, &more_output_bytes); |
305 | | //try decompress use hadoopLz4 ,if failed fall back lz4. |
306 | 0 | return (st != Status::OK() || stream_end != true) |
307 | 0 | ? Lz4BlockCompression::decompress(input, output) |
308 | 0 | : Status::OK(); |
309 | 0 | } |
310 | | |
311 | | private: |
312 | | std::unique_ptr<Decompressor> _decompressor; |
313 | | }; |
314 | | // Used for LZ4 frame format, decompress speed is two times faster than LZ4. |
315 | | class Lz4fBlockCompression : public BlockCompressionCodec { |
316 | | private: |
317 | | class CContext { |
318 | | ENABLE_FACTORY_CREATOR(CContext); |
319 | | |
320 | | public: |
321 | 1 | CContext() : ctx(nullptr) { |
322 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
323 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
324 | 1 | buffer = std::make_unique<faststring>(); |
325 | 1 | } |
326 | | LZ4F_compressionContext_t ctx; |
327 | | std::unique_ptr<faststring> buffer; |
328 | 1 | ~CContext() { |
329 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
330 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
331 | 1 | if (ctx) { |
332 | 1 | LZ4F_freeCompressionContext(ctx); |
333 | 1 | } |
334 | 1 | buffer.reset(); |
335 | 1 | } |
336 | | }; |
337 | | class DContext { |
338 | | ENABLE_FACTORY_CREATOR(DContext); |
339 | | |
340 | | public: |
341 | 6 | DContext() : ctx(nullptr) {} |
342 | | LZ4F_decompressionContext_t ctx; |
343 | 6 | ~DContext() { |
344 | 6 | if (ctx) { |
345 | 6 | LZ4F_freeDecompressionContext(ctx); |
346 | 6 | } |
347 | 6 | } |
348 | | }; |
349 | | |
350 | | public: |
351 | 28.4k | static Lz4fBlockCompression* instance() { |
352 | 28.4k | static Lz4fBlockCompression s_instance; |
353 | 28.4k | return &s_instance; |
354 | 28.4k | } |
355 | 1 | ~Lz4fBlockCompression() { |
356 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
357 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
358 | 1 | _ctx_c_pool.clear(); |
359 | 1 | _ctx_d_pool.clear(); |
360 | 1 | } |
361 | | |
362 | 5 | Status compress(const Slice& input, faststring* output) override { |
363 | 5 | std::vector<Slice> inputs {input}; |
364 | 5 | return compress(inputs, input.size, output); |
365 | 5 | } |
366 | | |
367 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
368 | 22.6k | faststring* output) override { |
369 | 22.6k | return _compress(inputs, uncompressed_size, output); |
370 | 22.6k | } |
371 | | |
372 | 8.51k | Status decompress(const Slice& input, Slice* output) override { |
373 | 8.51k | return _decompress(input, output); |
374 | 8.51k | } |
375 | | |
376 | 22.6k | size_t max_compressed_len(size_t len) override { |
377 | 22.6k | return std::max(LZ4F_compressBound(len, &_s_preferences), |
378 | 22.6k | LZ4F_compressFrameBound(len, &_s_preferences)); |
379 | 22.6k | } |
380 | | |
381 | | private: |
382 | | Status _compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
383 | 22.6k | faststring* output) { |
384 | 22.6k | std::unique_ptr<CContext> context; |
385 | 22.6k | RETURN_IF_ERROR(_acquire_compression_ctx(context)); |
386 | 22.6k | bool compress_failed = false; |
387 | 22.6k | Defer defer {[&] { |
388 | 22.6k | if (!compress_failed) { |
389 | 22.6k | _release_compression_ctx(std::move(context)); |
390 | 22.6k | } |
391 | 22.6k | }}; |
392 | | |
393 | 22.6k | try { |
394 | 22.6k | Slice compressed_buf; |
395 | 22.6k | size_t max_len = max_compressed_len(uncompressed_size); |
396 | 22.6k | if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
397 | | // use output directly |
398 | 0 | output->resize(max_len); |
399 | 0 | compressed_buf.data = reinterpret_cast<char*>(output->data()); |
400 | 0 | compressed_buf.size = max_len; |
401 | 22.6k | } else { |
402 | 22.6k | { |
403 | 22.6k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
404 | 22.6k | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
405 | | // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE |
406 | 22.6k | context->buffer->resize(max_len); |
407 | 22.6k | } |
408 | 22.6k | compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); |
409 | 22.6k | compressed_buf.size = max_len; |
410 | 22.6k | } |
411 | | |
412 | 22.6k | auto wbytes = LZ4F_compressBegin(context->ctx, compressed_buf.data, compressed_buf.size, |
413 | 22.6k | &_s_preferences); |
414 | 22.6k | if (LZ4F_isError(wbytes)) { |
415 | 0 | compress_failed = true; |
416 | 0 | return Status::InvalidArgument("Fail to do LZ4F compress begin, res={}", |
417 | 0 | LZ4F_getErrorName(wbytes)); |
418 | 0 | } |
419 | 22.6k | size_t offset = wbytes; |
420 | 23.1k | for (auto input : inputs) { |
421 | 23.1k | wbytes = LZ4F_compressUpdate(context->ctx, compressed_buf.data + offset, |
422 | 23.1k | compressed_buf.size - offset, input.data, input.size, |
423 | 23.1k | nullptr); |
424 | 23.1k | if (LZ4F_isError(wbytes)) { |
425 | 0 | compress_failed = true; |
426 | 0 | return Status::InvalidArgument("Fail to do LZ4F compress update, res={}", |
427 | 0 | LZ4F_getErrorName(wbytes)); |
428 | 0 | } |
429 | 23.1k | offset += wbytes; |
430 | 23.1k | } |
431 | 22.6k | wbytes = LZ4F_compressEnd(context->ctx, compressed_buf.data + offset, |
432 | 22.6k | compressed_buf.size - offset, nullptr); |
433 | 22.6k | if (LZ4F_isError(wbytes)) { |
434 | 0 | compress_failed = true; |
435 | 0 | return Status::InvalidArgument("Fail to do LZ4F compress end, res={}", |
436 | 0 | LZ4F_getErrorName(wbytes)); |
437 | 0 | } |
438 | 22.6k | offset += wbytes; |
439 | 22.6k | output->resize(offset); |
440 | 22.6k | if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
441 | 22.6k | output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), offset); |
442 | 22.6k | } |
443 | 22.6k | } catch (...) { |
444 | | // Do not set compress_failed to release context |
445 | 0 | DCHECK(!compress_failed); |
446 | 0 | return Status::InternalError("Fail to do LZ4F compress due to exception"); |
447 | 0 | } |
448 | | |
449 | 22.6k | return Status::OK(); |
450 | 22.6k | } |
451 | | |
452 | 8.51k | Status _decompress(const Slice& input, Slice* output) { |
453 | 8.51k | bool decompress_failed = false; |
454 | 8.51k | std::unique_ptr<DContext> context; |
455 | 8.51k | RETURN_IF_ERROR(_acquire_decompression_ctx(context)); |
456 | 8.51k | Defer defer {[&] { |
457 | 8.51k | if (!decompress_failed) { |
458 | 8.51k | _release_decompression_ctx(std::move(context)); |
459 | 8.51k | } |
460 | 8.51k | }}; |
461 | 8.51k | size_t input_size = input.size; |
462 | 8.51k | auto lres = LZ4F_decompress(context->ctx, output->data, &output->size, input.data, |
463 | 8.51k | &input_size, nullptr); |
464 | 8.51k | if (LZ4F_isError(lres)) { |
465 | 0 | decompress_failed = true; |
466 | 0 | return Status::InternalError("Fail to do LZ4F decompress, res={}", |
467 | 0 | LZ4F_getErrorName(lres)); |
468 | 8.51k | } else if (input_size != input.size) { |
469 | 0 | decompress_failed = true; |
470 | 0 | return Status::InvalidArgument( |
471 | 0 | absl::Substitute("Fail to do LZ4F decompress: trailing data left in " |
472 | 0 | "compressed data, read=$0 vs given=$1", |
473 | 0 | input_size, input.size)); |
474 | 8.51k | } else if (lres != 0) { |
475 | 5 | decompress_failed = true; |
476 | 5 | return Status::InvalidArgument( |
477 | 5 | "Fail to do LZ4F decompress: expect more compressed data, expect={}", lres); |
478 | 5 | } |
479 | 8.51k | return Status::OK(); |
480 | 8.51k | } |
481 | | |
482 | | private: |
483 | | // acquire a compression ctx from pool, release while finish compress, |
484 | | // delete if compression failed |
485 | 22.6k | Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) { |
486 | 22.6k | std::lock_guard<std::mutex> l(_ctx_c_mutex); |
487 | 22.6k | if (_ctx_c_pool.empty()) { |
488 | 1 | std::unique_ptr<CContext> localCtx = CContext::create_unique(); |
489 | 1 | if (localCtx.get() == nullptr) { |
490 | 0 | return Status::InvalidArgument("failed to new LZ4F CContext"); |
491 | 0 | } |
492 | 1 | auto res = LZ4F_createCompressionContext(&localCtx->ctx, LZ4F_VERSION); |
493 | 1 | if (LZ4F_isError(res) != 0) { |
494 | 0 | return Status::InvalidArgument(absl::Substitute( |
495 | 0 | "LZ4F_createCompressionContext error, res=$0", LZ4F_getErrorName(res))); |
496 | 0 | } |
497 | 1 | out = std::move(localCtx); |
498 | 1 | return Status::OK(); |
499 | 1 | } |
500 | 22.6k | out = std::move(_ctx_c_pool.back()); |
501 | 22.6k | _ctx_c_pool.pop_back(); |
502 | 22.6k | return Status::OK(); |
503 | 22.6k | } |
504 | 22.6k | void _release_compression_ctx(std::unique_ptr<CContext> context) { |
505 | 22.6k | DCHECK(context); |
506 | 22.6k | std::lock_guard<std::mutex> l(_ctx_c_mutex); |
507 | 22.6k | _ctx_c_pool.push_back(std::move(context)); |
508 | 22.6k | } |
509 | | |
510 | 8.51k | Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) { |
511 | 8.51k | std::lock_guard<std::mutex> l(_ctx_d_mutex); |
512 | 8.51k | if (_ctx_d_pool.empty()) { |
513 | 6 | std::unique_ptr<DContext> localCtx = DContext::create_unique(); |
514 | 6 | if (localCtx.get() == nullptr) { |
515 | 0 | return Status::InvalidArgument("failed to new LZ4F DContext"); |
516 | 0 | } |
517 | 6 | auto res = LZ4F_createDecompressionContext(&localCtx->ctx, LZ4F_VERSION); |
518 | 6 | if (LZ4F_isError(res) != 0) { |
519 | 0 | return Status::InvalidArgument(absl::Substitute( |
520 | 0 | "LZ4F_createDeompressionContext error, res=$0", LZ4F_getErrorName(res))); |
521 | 0 | } |
522 | 6 | out = std::move(localCtx); |
523 | 6 | return Status::OK(); |
524 | 6 | } |
525 | 8.51k | out = std::move(_ctx_d_pool.back()); |
526 | 8.51k | _ctx_d_pool.pop_back(); |
527 | 8.51k | return Status::OK(); |
528 | 8.51k | } |
529 | 8.51k | void _release_decompression_ctx(std::unique_ptr<DContext> context) { |
530 | 8.51k | DCHECK(context); |
531 | | // reset decompression context to avoid ERROR_maxBlockSize_invalid |
532 | 8.51k | LZ4F_resetDecompressionContext(context->ctx); |
533 | 8.51k | std::lock_guard<std::mutex> l(_ctx_d_mutex); |
534 | 8.51k | _ctx_d_pool.push_back(std::move(context)); |
535 | 8.51k | } |
536 | | |
537 | | private: |
538 | | static LZ4F_preferences_t _s_preferences; |
539 | | |
540 | | std::mutex _ctx_c_mutex; |
541 | | // LZ4F_compressionContext_t is a pointer so no copy here |
542 | | std::vector<std::unique_ptr<CContext>> _ctx_c_pool; |
543 | | |
544 | | std::mutex _ctx_d_mutex; |
545 | | std::vector<std::unique_ptr<DContext>> _ctx_d_pool; |
546 | | }; |
547 | | |
548 | | LZ4F_preferences_t Lz4fBlockCompression::_s_preferences = { |
549 | | {LZ4F_max256KB, LZ4F_blockLinked, LZ4F_noContentChecksum, LZ4F_frame, 0ULL, 0U, |
550 | | LZ4F_noBlockChecksum}, |
551 | | 0, |
552 | | 0u, |
553 | | 0u, |
554 | | {0u, 0u, 0u}}; |
555 | | |
556 | | class Lz4HCBlockCompression : public BlockCompressionCodec { |
557 | | private: |
558 | | class Context { |
559 | | ENABLE_FACTORY_CREATOR(Context); |
560 | | |
561 | | public: |
562 | 1 | Context() : ctx(nullptr) { |
563 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
564 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
565 | 1 | buffer = std::make_unique<faststring>(); |
566 | 1 | } |
567 | | LZ4_streamHC_t* ctx; |
568 | | std::unique_ptr<faststring> buffer; |
569 | 1 | ~Context() { |
570 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
571 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
572 | 1 | if (ctx) { |
573 | 1 | LZ4_freeStreamHC(ctx); |
574 | 1 | } |
575 | 1 | buffer.reset(); |
576 | 1 | } |
577 | | }; |
578 | | |
579 | | public: |
580 | 2 | static Lz4HCBlockCompression* instance() { |
581 | 2 | static Lz4HCBlockCompression s_instance; |
582 | 2 | return &s_instance; |
583 | 2 | } |
584 | 1 | ~Lz4HCBlockCompression() { |
585 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
586 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
587 | 1 | _ctx_pool.clear(); |
588 | 1 | } |
589 | | |
590 | 6 | Status compress(const Slice& input, faststring* output) override { |
591 | 6 | std::unique_ptr<Context> context; |
592 | 6 | RETURN_IF_ERROR(_acquire_compression_ctx(context)); |
593 | 6 | bool compress_failed = false; |
594 | 6 | Defer defer {[&] { |
595 | 6 | if (!compress_failed) { |
596 | 6 | _release_compression_ctx(std::move(context)); |
597 | 6 | } |
598 | 6 | }}; |
599 | | |
600 | 6 | try { |
601 | 6 | Slice compressed_buf; |
602 | 6 | size_t max_len = max_compressed_len(input.size); |
603 | 6 | if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
604 | | // use output directly |
605 | 0 | output->resize(max_len); |
606 | 0 | compressed_buf.data = reinterpret_cast<char*>(output->data()); |
607 | 0 | compressed_buf.size = max_len; |
608 | 6 | } else { |
609 | 6 | { |
610 | 6 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
611 | 6 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
612 | | // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE |
613 | 6 | context->buffer->resize(max_len); |
614 | 6 | } |
615 | 6 | compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); |
616 | 6 | compressed_buf.size = max_len; |
617 | 6 | } |
618 | | |
619 | 6 | size_t compressed_len = LZ4_compress_HC_continue( |
620 | 6 | context->ctx, input.data, compressed_buf.data, cast_set<int>(input.size), |
621 | 6 | static_cast<int>(compressed_buf.size)); |
622 | 6 | if (compressed_len == 0) { |
623 | 0 | compress_failed = true; |
624 | 0 | return Status::InvalidArgument("Output buffer's capacity is not enough, size={}", |
625 | 0 | compressed_buf.size); |
626 | 0 | } |
627 | 6 | output->resize(compressed_len); |
628 | 6 | if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
629 | 6 | output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), |
630 | 6 | compressed_len); |
631 | 6 | } |
632 | 6 | } catch (...) { |
633 | | // Do not set compress_failed to release context |
634 | 0 | DCHECK(!compress_failed); |
635 | 0 | return Status::InternalError("Fail to do LZ4HC compress due to exception"); |
636 | 0 | } |
637 | 6 | return Status::OK(); |
638 | 6 | } |
639 | | |
640 | 11 | Status decompress(const Slice& input, Slice* output) override { |
641 | 11 | auto decompressed_len = LZ4_decompress_safe( |
642 | 11 | input.data, output->data, cast_set<int>(input.size), cast_set<int>(output->size)); |
643 | 11 | if (decompressed_len < 0) { |
644 | 5 | return Status::InvalidArgument( |
645 | 5 | "destination buffer is not large enough or the source stream is detected " |
646 | 5 | "malformed, fail to do LZ4 decompress, error={}", |
647 | 5 | decompressed_len); |
648 | 5 | } |
649 | 6 | output->size = decompressed_len; |
650 | 6 | return Status::OK(); |
651 | 11 | } |
652 | | |
653 | 6 | size_t max_compressed_len(size_t len) override { return LZ4_compressBound(cast_set<int>(len)); } |
654 | | |
655 | | private: |
656 | 6 | Status _acquire_compression_ctx(std::unique_ptr<Context>& out) { |
657 | 6 | std::lock_guard<std::mutex> l(_ctx_mutex); |
658 | 6 | if (_ctx_pool.empty()) { |
659 | 1 | std::unique_ptr<Context> localCtx = Context::create_unique(); |
660 | 1 | if (localCtx.get() == nullptr) { |
661 | 0 | return Status::InvalidArgument("new LZ4HC context error"); |
662 | 0 | } |
663 | 1 | localCtx->ctx = LZ4_createStreamHC(); |
664 | 1 | if (localCtx->ctx == nullptr) { |
665 | 0 | return Status::InvalidArgument("LZ4_createStreamHC error"); |
666 | 0 | } |
667 | 1 | out = std::move(localCtx); |
668 | 1 | return Status::OK(); |
669 | 1 | } |
670 | 5 | out = std::move(_ctx_pool.back()); |
671 | 5 | _ctx_pool.pop_back(); |
672 | 5 | return Status::OK(); |
673 | 6 | } |
674 | 6 | void _release_compression_ctx(std::unique_ptr<Context> context) { |
675 | 6 | DCHECK(context); |
676 | 6 | LZ4_resetStreamHC_fast(context->ctx, static_cast<int>(_compression_level)); |
677 | 6 | std::lock_guard<std::mutex> l(_ctx_mutex); |
678 | 6 | _ctx_pool.push_back(std::move(context)); |
679 | 6 | } |
680 | | |
681 | | private: |
682 | | int64_t _compression_level = config::LZ4_HC_compression_level; |
683 | | mutable std::mutex _ctx_mutex; |
684 | | mutable std::vector<std::unique_ptr<Context>> _ctx_pool; |
685 | | }; |
686 | | |
687 | | class SnappySlicesSource : public snappy::Source { |
688 | | public: |
689 | | SnappySlicesSource(const std::vector<Slice>& slices) |
690 | 1 | : _available(0), _cur_slice(0), _slice_off(0) { |
691 | 5 | for (auto& slice : slices) { |
692 | | // We filter empty slice here to avoid complicated process |
693 | 5 | if (slice.size == 0) { |
694 | 1 | continue; |
695 | 1 | } |
696 | 4 | _available += slice.size; |
697 | 4 | _slices.push_back(slice); |
698 | 4 | } |
699 | 1 | } |
700 | 1 | ~SnappySlicesSource() override {} |
701 | | |
702 | | // Return the number of bytes left to read from the source |
703 | 1 | size_t Available() const override { return _available; } |
704 | | |
705 | | // Peek at the next flat region of the source. Does not reposition |
706 | | // the source. The returned region is empty iff Available()==0. |
707 | | // |
708 | | // Returns a pointer to the beginning of the region and store its |
709 | | // length in *len. |
710 | | // |
711 | | // The returned region is valid until the next call to Skip() or |
712 | | // until this object is destroyed, whichever occurs first. |
713 | | // |
714 | | // The returned region may be larger than Available() (for example |
715 | | // if this ByteSource is a view on a substring of a larger source). |
716 | | // The caller is responsible for ensuring that it only reads the |
717 | | // Available() bytes. |
718 | 19 | const char* Peek(size_t* len) override { |
719 | 19 | if (_available == 0) { |
720 | 0 | *len = 0; |
721 | 0 | return nullptr; |
722 | 0 | } |
723 | | // we should assure that *len is not 0 |
724 | 19 | *len = _slices[_cur_slice].size - _slice_off; |
725 | 19 | DCHECK(*len != 0); |
726 | 19 | return _slices[_cur_slice].data + _slice_off; |
727 | 19 | } |
728 | | |
729 | | // Skip the next n bytes. Invalidates any buffer returned by |
730 | | // a previous call to Peek(). |
731 | | // REQUIRES: Available() >= n |
732 | 20 | void Skip(size_t n) override { |
733 | 20 | _available -= n; |
734 | 24 | while (n > 0) { |
735 | 19 | auto left = _slices[_cur_slice].size - _slice_off; |
736 | 19 | if (left > n) { |
737 | | // n can be digest in current slice |
738 | 15 | _slice_off += n; |
739 | 15 | return; |
740 | 15 | } |
741 | 4 | _slice_off = 0; |
742 | 4 | _cur_slice++; |
743 | 4 | n -= left; |
744 | 4 | } |
745 | 20 | } |
746 | | |
747 | | private: |
748 | | std::vector<Slice> _slices; |
749 | | size_t _available; |
750 | | size_t _cur_slice; |
751 | | size_t _slice_off; |
752 | | }; |
753 | | |
754 | | class SnappyBlockCompression : public BlockCompressionCodec { |
755 | | public: |
756 | 155 | static SnappyBlockCompression* instance() { |
757 | 155 | static SnappyBlockCompression s_instance; |
758 | 155 | return &s_instance; |
759 | 155 | } |
760 | 1 | ~SnappyBlockCompression() override = default; |
761 | | |
762 | 34 | Status compress(const Slice& input, faststring* output) override { |
763 | 34 | size_t max_len = max_compressed_len(input.size); |
764 | 34 | output->resize(max_len); |
765 | 34 | Slice s(*output); |
766 | | |
767 | 34 | snappy::RawCompress(input.data, input.size, s.data, &s.size); |
768 | 34 | output->resize(s.size); |
769 | 34 | return Status::OK(); |
770 | 34 | } |
771 | | |
772 | 145 | Status decompress(const Slice& input, Slice* output) override { |
773 | 145 | if (!snappy::RawUncompress(input.data, input.size, output->data)) { |
774 | 0 | return Status::InvalidArgument("Fail to do Snappy decompress"); |
775 | 0 | } |
776 | | // NOTE: GetUncompressedLength only takes O(1) time |
777 | 145 | snappy::GetUncompressedLength(input.data, input.size, &output->size); |
778 | 145 | return Status::OK(); |
779 | 145 | } |
780 | | |
781 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
782 | 1 | faststring* output) override { |
783 | 1 | auto max_len = max_compressed_len(uncompressed_size); |
784 | 1 | output->resize(max_len); |
785 | | |
786 | 1 | SnappySlicesSource source(inputs); |
787 | 1 | snappy::UncheckedByteArraySink sink(reinterpret_cast<char*>(output->data())); |
788 | 1 | output->resize(snappy::Compress(&source, &sink)); |
789 | 1 | return Status::OK(); |
790 | 1 | } |
791 | | |
792 | 35 | size_t max_compressed_len(size_t len) override { return snappy::MaxCompressedLength(len); } |
793 | | }; |
794 | | |
795 | | class HadoopSnappyBlockCompression : public SnappyBlockCompression { |
796 | | public: |
797 | 0 | static HadoopSnappyBlockCompression* instance() { |
798 | 0 | static HadoopSnappyBlockCompression s_instance; |
799 | 0 | return &s_instance; |
800 | 0 | } |
801 | 0 | ~HadoopSnappyBlockCompression() override = default; |
802 | | |
803 | | // hadoop use block compression for snappy |
804 | | // https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/SnappyCodec.cc |
805 | 0 | Status compress(const Slice& input, faststring* output) override { |
806 | | // be same with hadop https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/SnappyCodec.java |
807 | 0 | size_t snappy_block_size = config::snappy_compression_block_size; |
808 | 0 | size_t overhead = snappy_block_size / 6 + 32; |
809 | 0 | size_t max_input_size = snappy_block_size - overhead; |
810 | |
|
811 | 0 | size_t data_len = input.size; |
812 | 0 | char* data = input.data; |
813 | 0 | std::vector<OwnedSlice> buffers; |
814 | 0 | size_t out_len = 0; |
815 | |
|
816 | 0 | while (data_len > 0) { |
817 | 0 | size_t input_size = std::min(data_len, max_input_size); |
818 | 0 | Slice input_slice(data, input_size); |
819 | 0 | faststring output_data; |
820 | 0 | RETURN_IF_ERROR(SnappyBlockCompression::compress(input_slice, &output_data)); |
821 | 0 | out_len += output_data.size(); |
822 | | // the OwnedSlice will be moved here |
823 | 0 | buffers.push_back(output_data.build()); |
824 | 0 | data += input_size; |
825 | 0 | data_len -= input_size; |
826 | 0 | } |
827 | | |
828 | | // hadoop block compression: umcompressed_length | compressed_length1 | compressed_data1 | compressed_length2 | compressed_data2 | ... |
829 | 0 | size_t total_output_len = 4 + 4 * buffers.size() + out_len; |
830 | 0 | output->resize(total_output_len); |
831 | 0 | char* output_buffer = (char*)output->data(); |
832 | 0 | BigEndian::Store32(output_buffer, cast_set<uint32_t>(input.get_size())); |
833 | 0 | output_buffer += 4; |
834 | 0 | for (const auto& buffer : buffers) { |
835 | 0 | auto slice = buffer.slice(); |
836 | 0 | BigEndian::Store32(output_buffer, cast_set<uint32_t>(slice.get_size())); |
837 | 0 | output_buffer += 4; |
838 | 0 | memcpy(output_buffer, slice.get_data(), slice.get_size()); |
839 | 0 | output_buffer += slice.get_size(); |
840 | 0 | } |
841 | |
|
842 | 0 | DCHECK_EQ(output_buffer - (char*)output->data(), total_output_len); |
843 | |
|
844 | 0 | return Status::OK(); |
845 | 0 | } |
846 | | |
847 | 0 | Status decompress(const Slice& input, Slice* output) override { |
848 | 0 | return Status::InternalError("unimplement: SnappyHadoopBlockCompression::decompress"); |
849 | 0 | } |
850 | | }; |
851 | | |
852 | | class ZlibBlockCompression : public BlockCompressionCodec { |
853 | | public: |
854 | 2 | static ZlibBlockCompression* instance() { |
855 | 2 | static ZlibBlockCompression s_instance; |
856 | 2 | return &s_instance; |
857 | 2 | } |
858 | 1 | ~ZlibBlockCompression() override = default; |
859 | | |
860 | 5 | Status compress(const Slice& input, faststring* output) override { |
861 | 5 | size_t max_len = max_compressed_len(input.size); |
862 | 5 | output->resize(max_len); |
863 | 5 | Slice s(*output); |
864 | | |
865 | 5 | auto zres = ::compress((Bytef*)s.data, &s.size, (Bytef*)input.data, input.size); |
866 | 5 | if (zres == Z_MEM_ERROR) { |
867 | 0 | throw Exception(Status::MemoryLimitExceeded(fmt::format( |
868 | 0 | "ZLib compression failed due to memory allocationerror.error = {}, res = {} ", |
869 | 0 | zError(zres), zres))); |
870 | 5 | } else if (zres != Z_OK) { |
871 | 0 | return Status::InternalError("Fail to do Zlib compress, error={}", zError(zres)); |
872 | 0 | } |
873 | 5 | output->resize(s.size); |
874 | 5 | return Status::OK(); |
875 | 5 | } |
876 | | |
877 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
878 | 1 | faststring* output) override { |
879 | 1 | size_t max_len = max_compressed_len(uncompressed_size); |
880 | 1 | output->resize(max_len); |
881 | | |
882 | 1 | z_stream zstrm; |
883 | 1 | zstrm.zalloc = Z_NULL; |
884 | 1 | zstrm.zfree = Z_NULL; |
885 | 1 | zstrm.opaque = Z_NULL; |
886 | 1 | auto zres = deflateInit(&zstrm, Z_DEFAULT_COMPRESSION); |
887 | 1 | if (zres == Z_MEM_ERROR) { |
888 | 0 | throw Exception(Status::MemoryLimitExceeded( |
889 | 0 | "Fail to do ZLib stream compress, error={}, res={}", zError(zres), zres)); |
890 | 1 | } else if (zres != Z_OK) { |
891 | 0 | return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
892 | 0 | zError(zres), zres); |
893 | 0 | } |
894 | | // we assume that output is e |
895 | 1 | zstrm.next_out = (Bytef*)output->data(); |
896 | 1 | zstrm.avail_out = cast_set<decltype(zstrm.avail_out)>(output->size()); |
897 | 6 | for (int i = 0; i < inputs.size(); ++i) { |
898 | 5 | if (inputs[i].size == 0) { |
899 | 1 | continue; |
900 | 1 | } |
901 | 4 | zstrm.next_in = (Bytef*)inputs[i].data; |
902 | 4 | zstrm.avail_in = cast_set<decltype(zstrm.avail_in)>(inputs[i].size); |
903 | 4 | int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH; |
904 | | |
905 | 4 | zres = deflate(&zstrm, flush); |
906 | 4 | if (zres != Z_OK && zres != Z_STREAM_END) { |
907 | 0 | return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
908 | 0 | zError(zres), zres); |
909 | 0 | } |
910 | 4 | } |
911 | | |
912 | 1 | output->resize(zstrm.total_out); |
913 | 1 | zres = deflateEnd(&zstrm); |
914 | 1 | if (zres == Z_DATA_ERROR) { |
915 | 0 | return Status::InvalidArgument("Fail to do deflateEnd, error={}, res={}", zError(zres), |
916 | 0 | zres); |
917 | 1 | } else if (zres != Z_OK) { |
918 | 0 | return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}", |
919 | 0 | zError(zres), zres); |
920 | 0 | } |
921 | 1 | return Status::OK(); |
922 | 1 | } |
923 | | |
924 | 14 | Status decompress(const Slice& input, Slice* output) override { |
925 | 14 | size_t input_size = input.size; |
926 | 14 | auto zres = |
927 | 14 | ::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size); |
928 | 14 | if (zres == Z_DATA_ERROR) { |
929 | 1 | return Status::InvalidArgument("Fail to do ZLib decompress, error={}", zError(zres)); |
930 | 13 | } else if (zres == Z_MEM_ERROR) { |
931 | 0 | throw Exception(Status::MemoryLimitExceeded("Fail to do ZLib decompress, error={}", |
932 | 0 | zError(zres))); |
933 | 13 | } else if (zres != Z_OK) { |
934 | 7 | return Status::InternalError("Fail to do ZLib decompress, error={}", zError(zres)); |
935 | 7 | } |
936 | 6 | return Status::OK(); |
937 | 14 | } |
938 | | |
939 | 6 | size_t max_compressed_len(size_t len) override { |
940 | | // one-time overhead of six bytes for the entire stream plus five bytes per 16 KB block |
941 | 6 | return len + 6 + 5 * ((len >> 14) + 1); |
942 | 6 | } |
943 | | }; |
944 | | |
945 | | class Bzip2BlockCompression : public BlockCompressionCodec { |
946 | | public: |
947 | 0 | static Bzip2BlockCompression* instance() { |
948 | 0 | static Bzip2BlockCompression s_instance; |
949 | 0 | return &s_instance; |
950 | 0 | } |
951 | 0 | ~Bzip2BlockCompression() override = default; |
952 | | |
953 | 0 | Status compress(const Slice& input, faststring* output) override { |
954 | 0 | size_t max_len = max_compressed_len(input.size); |
955 | 0 | output->resize(max_len); |
956 | 0 | auto size = cast_set<uint32_t>(output->size()); |
957 | 0 | auto bzres = BZ2_bzBuffToBuffCompress((char*)output->data(), &size, (char*)input.data, |
958 | 0 | cast_set<uint32_t>(input.size), 9, 0, 0); |
959 | 0 | if (bzres == BZ_MEM_ERROR) { |
960 | 0 | throw Exception( |
961 | 0 | Status::MemoryLimitExceeded("Fail to do Bzip2 compress, ret={}", bzres)); |
962 | 0 | } else if (bzres == BZ_PARAM_ERROR) { |
963 | 0 | return Status::InvalidArgument("Fail to do Bzip2 compress, ret={}", bzres); |
964 | 0 | } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK && |
965 | 0 | bzres != BZ_STREAM_END && bzres != BZ_OK) { |
966 | 0 | return Status::InternalError("Failed to init bz2. status code: {}", bzres); |
967 | 0 | } |
968 | 0 | output->resize(size); |
969 | 0 | return Status::OK(); |
970 | 0 | } |
971 | | |
972 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
973 | 0 | faststring* output) override { |
974 | 0 | size_t max_len = max_compressed_len(uncompressed_size); |
975 | 0 | output->resize(max_len); |
976 | |
|
977 | 0 | bz_stream bzstrm; |
978 | 0 | bzero(&bzstrm, sizeof(bzstrm)); |
979 | 0 | int bzres = BZ2_bzCompressInit(&bzstrm, 9, 0, 0); |
980 | 0 | if (bzres == BZ_PARAM_ERROR) { |
981 | 0 | return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres); |
982 | 0 | } else if (bzres == BZ_MEM_ERROR) { |
983 | 0 | throw Exception( |
984 | 0 | Status::MemoryLimitExceeded("Failed to init bz2. status code: {}", bzres)); |
985 | 0 | } else if (bzres != BZ_OK) { |
986 | 0 | return Status::InternalError("Failed to init bz2. status code: {}", bzres); |
987 | 0 | } |
988 | | // we assume that output is e |
989 | 0 | bzstrm.next_out = (char*)output->data(); |
990 | 0 | bzstrm.avail_out = cast_set<uint32_t>(output->size()); |
991 | 0 | for (int i = 0; i < inputs.size(); ++i) { |
992 | 0 | if (inputs[i].size == 0) { |
993 | 0 | continue; |
994 | 0 | } |
995 | 0 | bzstrm.next_in = (char*)inputs[i].data; |
996 | 0 | bzstrm.avail_in = cast_set<uint32_t>(inputs[i].size); |
997 | 0 | int flush = (i == (inputs.size() - 1)) ? BZ_FINISH : BZ_RUN; |
998 | |
|
999 | 0 | bzres = BZ2_bzCompress(&bzstrm, flush); |
1000 | 0 | if (bzres == BZ_PARAM_ERROR) { |
1001 | 0 | return Status::InvalidArgument("Failed to init bz2. status code: {}", bzres); |
1002 | 0 | } else if (bzres != BZ_RUN_OK && bzres != BZ_FLUSH_OK && bzres != BZ_FINISH_OK && |
1003 | 0 | bzres != BZ_STREAM_END && bzres != BZ_OK) { |
1004 | 0 | return Status::InternalError("Failed to init bz2. status code: {}", bzres); |
1005 | 0 | } |
1006 | 0 | } |
1007 | | |
1008 | 0 | size_t total_out = (size_t)bzstrm.total_out_hi32 << 32 | (size_t)bzstrm.total_out_lo32; |
1009 | 0 | output->resize(total_out); |
1010 | 0 | bzres = BZ2_bzCompressEnd(&bzstrm); |
1011 | 0 | if (bzres == BZ_PARAM_ERROR) { |
1012 | 0 | return Status::InvalidArgument("Fail to do deflateEnd on bzip2 stream, res={}", bzres); |
1013 | 0 | } else if (bzres != BZ_OK) { |
1014 | 0 | return Status::InternalError("Fail to do deflateEnd on bzip2 stream, res={}", bzres); |
1015 | 0 | } |
1016 | 0 | return Status::OK(); |
1017 | 0 | } |
1018 | | |
1019 | 0 | Status decompress(const Slice& input, Slice* output) override { |
1020 | 0 | return Status::InternalError("unimplement: Bzip2BlockCompression::decompress"); |
1021 | 0 | } |
1022 | | |
1023 | 0 | size_t max_compressed_len(size_t len) override { |
1024 | | // TODO: make sure the max_compressed_len for bzip2 |
1025 | 0 | return len * 2; |
1026 | 0 | } |
1027 | | }; |
1028 | | |
1029 | | // for ZSTD compression and decompression, with BOTH fast and high compression ratio |
1030 | | class ZstdBlockCompression : public BlockCompressionCodec { |
1031 | | private: |
1032 | | class CContext { |
1033 | | ENABLE_FACTORY_CREATOR(CContext); |
1034 | | |
1035 | | public: |
1036 | 1 | CContext() : ctx(nullptr) { |
1037 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
1038 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
1039 | 1 | buffer = std::make_unique<faststring>(); |
1040 | 1 | } |
1041 | | ZSTD_CCtx* ctx; |
1042 | | std::unique_ptr<faststring> buffer; |
1043 | 1 | ~CContext() { |
1044 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
1045 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
1046 | 1 | if (ctx) { |
1047 | 1 | ZSTD_freeCCtx(ctx); |
1048 | 1 | } |
1049 | 1 | buffer.reset(); |
1050 | 1 | } |
1051 | | }; |
1052 | | class DContext { |
1053 | | ENABLE_FACTORY_CREATOR(DContext); |
1054 | | |
1055 | | public: |
1056 | 6 | DContext() : ctx(nullptr) {} |
1057 | | ZSTD_DCtx* ctx; |
1058 | 6 | ~DContext() { |
1059 | 6 | if (ctx) { |
1060 | 6 | ZSTD_freeDCtx(ctx); |
1061 | 6 | } |
1062 | 6 | } |
1063 | | }; |
1064 | | |
1065 | | public: |
1066 | 504 | static ZstdBlockCompression* instance() { |
1067 | 504 | static ZstdBlockCompression s_instance; |
1068 | 504 | return &s_instance; |
1069 | 504 | } |
1070 | 1 | ~ZstdBlockCompression() { |
1071 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
1072 | 1 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
1073 | 1 | _ctx_c_pool.clear(); |
1074 | 1 | _ctx_d_pool.clear(); |
1075 | 1 | } |
1076 | | |
1077 | 327 | size_t max_compressed_len(size_t len) override { return ZSTD_compressBound(len); } |
1078 | | |
1079 | 177 | Status compress(const Slice& input, faststring* output) override { |
1080 | 177 | std::vector<Slice> inputs {input}; |
1081 | 177 | return compress(inputs, input.size, output); |
1082 | 177 | } |
1083 | | |
1084 | | // follow ZSTD official example |
1085 | | // https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c |
1086 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
1087 | 327 | faststring* output) override { |
1088 | 327 | std::unique_ptr<CContext> context; |
1089 | 327 | RETURN_IF_ERROR(_acquire_compression_ctx(context)); |
1090 | 327 | bool compress_failed = false; |
1091 | 327 | Defer defer {[&] { |
1092 | 327 | if (!compress_failed) { |
1093 | 327 | _release_compression_ctx(std::move(context)); |
1094 | 327 | } |
1095 | 327 | }}; |
1096 | | |
1097 | 327 | try { |
1098 | 327 | size_t max_len = max_compressed_len(uncompressed_size); |
1099 | 327 | Slice compressed_buf; |
1100 | 327 | if (max_len > MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
1101 | | // use output directly |
1102 | 4 | output->resize(max_len); |
1103 | 4 | compressed_buf.data = reinterpret_cast<char*>(output->data()); |
1104 | 4 | compressed_buf.size = max_len; |
1105 | 323 | } else { |
1106 | 323 | { |
1107 | 323 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( |
1108 | 323 | ExecEnv::GetInstance()->block_compression_mem_tracker()); |
1109 | | // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE |
1110 | 323 | context->buffer->resize(max_len); |
1111 | 323 | } |
1112 | 323 | compressed_buf.data = reinterpret_cast<char*>(context->buffer->data()); |
1113 | 323 | compressed_buf.size = max_len; |
1114 | 323 | } |
1115 | | |
1116 | | // set compression level to default 3 |
1117 | 327 | auto ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_compressionLevel, |
1118 | 327 | ZSTD_CLEVEL_DEFAULT); |
1119 | 327 | if (ZSTD_isError(ret)) { |
1120 | 0 | return Status::InvalidArgument("ZSTD_CCtx_setParameter compression level error: {}", |
1121 | 0 | ZSTD_getErrorString(ZSTD_getErrorCode(ret))); |
1122 | 0 | } |
1123 | | // set checksum flag to 1 |
1124 | 327 | ret = ZSTD_CCtx_setParameter(context->ctx, ZSTD_c_checksumFlag, 1); |
1125 | 327 | if (ZSTD_isError(ret)) { |
1126 | 0 | return Status::InvalidArgument("ZSTD_CCtx_setParameter checksumFlag error: {}", |
1127 | 0 | ZSTD_getErrorString(ZSTD_getErrorCode(ret))); |
1128 | 0 | } |
1129 | | |
1130 | 327 | ZSTD_outBuffer out_buf = {compressed_buf.data, compressed_buf.size, 0}; |
1131 | | |
1132 | 658 | for (size_t i = 0; i < inputs.size(); i++) { |
1133 | 331 | ZSTD_inBuffer in_buf = {inputs[i].data, inputs[i].size, 0}; |
1134 | | |
1135 | 331 | bool last_input = (i == inputs.size() - 1); |
1136 | 331 | auto mode = last_input ? ZSTD_e_end : ZSTD_e_continue; |
1137 | | |
1138 | 331 | bool finished = false; |
1139 | 331 | do { |
1140 | | // do compress |
1141 | 331 | ret = ZSTD_compressStream2(context->ctx, &out_buf, &in_buf, mode); |
1142 | | |
1143 | 331 | if (ZSTD_isError(ret)) { |
1144 | 0 | compress_failed = true; |
1145 | 0 | return Status::InternalError("ZSTD_compressStream2 error: {}", |
1146 | 0 | ZSTD_getErrorString(ZSTD_getErrorCode(ret))); |
1147 | 0 | } |
1148 | | |
1149 | | // ret is ZSTD hint for needed output buffer size |
1150 | 331 | if (ret > 0 && out_buf.pos == out_buf.size) { |
1151 | 0 | compress_failed = true; |
1152 | 0 | return Status::InternalError("ZSTD_compressStream2 output buffer full"); |
1153 | 0 | } |
1154 | | |
1155 | 331 | finished = last_input ? (ret == 0) : (in_buf.pos == inputs[i].size); |
1156 | 331 | } while (!finished); |
1157 | 331 | } |
1158 | | |
1159 | | // set compressed size for caller |
1160 | 327 | output->resize(out_buf.pos); |
1161 | 327 | if (max_len <= MAX_COMPRESSION_BUFFER_SIZE_FOR_REUSE) { |
1162 | 323 | output->assign_copy(reinterpret_cast<uint8_t*>(compressed_buf.data), out_buf.pos); |
1163 | 323 | } |
1164 | 327 | } catch (std::exception& e) { |
1165 | 0 | return Status::InternalError("Fail to do ZSTD compress due to exception {}", e.what()); |
1166 | 0 | } catch (...) { |
1167 | | // Do not set compress_failed to release context |
1168 | 0 | DCHECK(!compress_failed); |
1169 | 0 | return Status::InternalError("Fail to do ZSTD compress due to exception"); |
1170 | 0 | } |
1171 | | |
1172 | 327 | return Status::OK(); |
1173 | 327 | } |
1174 | | |
1175 | 281 | Status decompress(const Slice& input, Slice* output) override { |
1176 | 281 | std::unique_ptr<DContext> context; |
1177 | 281 | bool decompress_failed = false; |
1178 | 281 | RETURN_IF_ERROR(_acquire_decompression_ctx(context)); |
1179 | 281 | Defer defer {[&] { |
1180 | 281 | if (!decompress_failed) { |
1181 | 276 | _release_decompression_ctx(std::move(context)); |
1182 | 276 | } |
1183 | 281 | }}; |
1184 | | |
1185 | 281 | size_t ret = ZSTD_decompressDCtx(context->ctx, output->data, output->size, input.data, |
1186 | 281 | input.size); |
1187 | 281 | if (ZSTD_isError(ret)) { |
1188 | 5 | decompress_failed = true; |
1189 | 5 | return Status::InternalError("ZSTD_decompressDCtx error: {}", |
1190 | 5 | ZSTD_getErrorString(ZSTD_getErrorCode(ret))); |
1191 | 5 | } |
1192 | | |
1193 | | // set decompressed size for caller |
1194 | 276 | output->size = ret; |
1195 | | |
1196 | 276 | return Status::OK(); |
1197 | 281 | } |
1198 | | |
1199 | | private: |
1200 | 327 | Status _acquire_compression_ctx(std::unique_ptr<CContext>& out) { |
1201 | 327 | std::lock_guard<std::mutex> l(_ctx_c_mutex); |
1202 | 327 | if (_ctx_c_pool.empty()) { |
1203 | 1 | std::unique_ptr<CContext> localCtx = CContext::create_unique(); |
1204 | 1 | if (localCtx.get() == nullptr) { |
1205 | 0 | return Status::InvalidArgument("failed to new ZSTD CContext"); |
1206 | 0 | } |
1207 | | //typedef LZ4F_cctx* LZ4F_compressionContext_t; |
1208 | 1 | localCtx->ctx = ZSTD_createCCtx(); |
1209 | 1 | if (localCtx->ctx == nullptr) { |
1210 | 0 | return Status::InvalidArgument("Failed to create ZSTD compress ctx"); |
1211 | 0 | } |
1212 | 1 | out = std::move(localCtx); |
1213 | 1 | return Status::OK(); |
1214 | 1 | } |
1215 | 326 | out = std::move(_ctx_c_pool.back()); |
1216 | 326 | _ctx_c_pool.pop_back(); |
1217 | 326 | return Status::OK(); |
1218 | 327 | } |
1219 | 327 | void _release_compression_ctx(std::unique_ptr<CContext> context) { |
1220 | 327 | DCHECK(context); |
1221 | 327 | auto ret = ZSTD_CCtx_reset(context->ctx, ZSTD_reset_session_only); |
1222 | 327 | DCHECK(!ZSTD_isError(ret)); |
1223 | 327 | std::lock_guard<std::mutex> l(_ctx_c_mutex); |
1224 | 327 | _ctx_c_pool.push_back(std::move(context)); |
1225 | 327 | } |
1226 | | |
1227 | 281 | Status _acquire_decompression_ctx(std::unique_ptr<DContext>& out) { |
1228 | 281 | std::lock_guard<std::mutex> l(_ctx_d_mutex); |
1229 | 281 | if (_ctx_d_pool.empty()) { |
1230 | 6 | std::unique_ptr<DContext> localCtx = DContext::create_unique(); |
1231 | 6 | if (localCtx.get() == nullptr) { |
1232 | 0 | return Status::InvalidArgument("failed to new ZSTD DContext"); |
1233 | 0 | } |
1234 | 6 | localCtx->ctx = ZSTD_createDCtx(); |
1235 | 6 | if (localCtx->ctx == nullptr) { |
1236 | 0 | return Status::InvalidArgument("Fail to init ZSTD decompress context"); |
1237 | 0 | } |
1238 | 6 | out = std::move(localCtx); |
1239 | 6 | return Status::OK(); |
1240 | 6 | } |
1241 | 275 | out = std::move(_ctx_d_pool.back()); |
1242 | 275 | _ctx_d_pool.pop_back(); |
1243 | 275 | return Status::OK(); |
1244 | 281 | } |
1245 | 276 | void _release_decompression_ctx(std::unique_ptr<DContext> context) { |
1246 | 276 | DCHECK(context); |
1247 | | // reset ctx to start a new decompress session |
1248 | 276 | auto ret = ZSTD_DCtx_reset(context->ctx, ZSTD_reset_session_only); |
1249 | 276 | DCHECK(!ZSTD_isError(ret)); |
1250 | 276 | std::lock_guard<std::mutex> l(_ctx_d_mutex); |
1251 | 276 | _ctx_d_pool.push_back(std::move(context)); |
1252 | 276 | } |
1253 | | |
1254 | | private: |
1255 | | mutable std::mutex _ctx_c_mutex; |
1256 | | mutable std::vector<std::unique_ptr<CContext>> _ctx_c_pool; |
1257 | | |
1258 | | mutable std::mutex _ctx_d_mutex; |
1259 | | mutable std::vector<std::unique_ptr<DContext>> _ctx_d_pool; |
1260 | | }; |
1261 | | |
1262 | | class GzipBlockCompression : public ZlibBlockCompression { |
1263 | | public: |
1264 | 0 | static GzipBlockCompression* instance() { |
1265 | 0 | static GzipBlockCompression s_instance; |
1266 | 0 | return &s_instance; |
1267 | 0 | } |
1268 | 0 | ~GzipBlockCompression() override = default; |
1269 | | |
1270 | 0 | Status compress(const Slice& input, faststring* output) override { |
1271 | 0 | size_t max_len = max_compressed_len(input.size); |
1272 | 0 | output->resize(max_len); |
1273 | |
|
1274 | 0 | z_stream z_strm = {}; |
1275 | 0 | z_strm.zalloc = Z_NULL; |
1276 | 0 | z_strm.zfree = Z_NULL; |
1277 | 0 | z_strm.opaque = Z_NULL; |
1278 | |
|
1279 | 0 | int zres = deflateInit2(&z_strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, |
1280 | 0 | 8, Z_DEFAULT_STRATEGY); |
1281 | |
|
1282 | 0 | if (zres == Z_MEM_ERROR) { |
1283 | 0 | throw Exception(Status::MemoryLimitExceeded( |
1284 | 0 | "Fail to init ZLib compress, error={}, res={}", zError(zres), zres)); |
1285 | 0 | } else if (zres != Z_OK) { |
1286 | 0 | return Status::InternalError("Fail to init ZLib compress, error={}, res={}", |
1287 | 0 | zError(zres), zres); |
1288 | 0 | } |
1289 | | |
1290 | 0 | z_strm.next_in = (Bytef*)input.get_data(); |
1291 | 0 | z_strm.avail_in = cast_set<decltype(z_strm.avail_in)>(input.get_size()); |
1292 | 0 | z_strm.next_out = (Bytef*)output->data(); |
1293 | 0 | z_strm.avail_out = cast_set<decltype(z_strm.avail_out)>(output->size()); |
1294 | |
|
1295 | 0 | zres = deflate(&z_strm, Z_FINISH); |
1296 | 0 | if (zres != Z_OK && zres != Z_STREAM_END) { |
1297 | 0 | return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
1298 | 0 | zError(zres), zres); |
1299 | 0 | } |
1300 | | |
1301 | 0 | output->resize(z_strm.total_out); |
1302 | 0 | zres = deflateEnd(&z_strm); |
1303 | 0 | if (zres == Z_DATA_ERROR) { |
1304 | 0 | return Status::InvalidArgument("Fail to end zlib compress"); |
1305 | 0 | } else if (zres != Z_OK) { |
1306 | 0 | return Status::InternalError("Fail to end zlib compress"); |
1307 | 0 | } |
1308 | 0 | return Status::OK(); |
1309 | 0 | } |
1310 | | |
1311 | | Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size, |
1312 | 0 | faststring* output) override { |
1313 | 0 | size_t max_len = max_compressed_len(uncompressed_size); |
1314 | 0 | output->resize(max_len); |
1315 | |
|
1316 | 0 | z_stream zstrm; |
1317 | 0 | zstrm.zalloc = Z_NULL; |
1318 | 0 | zstrm.zfree = Z_NULL; |
1319 | 0 | zstrm.opaque = Z_NULL; |
1320 | 0 | auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, |
1321 | 0 | 8, Z_DEFAULT_STRATEGY); |
1322 | 0 | if (zres == Z_MEM_ERROR) { |
1323 | 0 | throw Exception(Status::MemoryLimitExceeded( |
1324 | 0 | "Fail to init ZLib stream compress, error={}, res={}", zError(zres), zres)); |
1325 | 0 | } else if (zres != Z_OK) { |
1326 | 0 | return Status::InternalError("Fail to init ZLib stream compress, error={}, res={}", |
1327 | 0 | zError(zres), zres); |
1328 | 0 | } |
1329 | | |
1330 | | // we assume that output is e |
1331 | 0 | zstrm.next_out = (Bytef*)output->data(); |
1332 | 0 | zstrm.avail_out = cast_set<decltype(zstrm.avail_out)>(output->size()); |
1333 | 0 | for (int i = 0; i < inputs.size(); ++i) { |
1334 | 0 | if (inputs[i].size == 0) { |
1335 | 0 | continue; |
1336 | 0 | } |
1337 | 0 | zstrm.next_in = (Bytef*)inputs[i].data; |
1338 | 0 | zstrm.avail_in = cast_set<decltype(zstrm.avail_in)>(inputs[i].size); |
1339 | 0 | int flush = (i == (inputs.size() - 1)) ? Z_FINISH : Z_NO_FLUSH; |
1340 | |
|
1341 | 0 | zres = deflate(&zstrm, flush); |
1342 | 0 | if (zres != Z_OK && zres != Z_STREAM_END) { |
1343 | 0 | return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
1344 | 0 | zError(zres), zres); |
1345 | 0 | } |
1346 | 0 | } |
1347 | | |
1348 | 0 | output->resize(zstrm.total_out); |
1349 | 0 | zres = deflateEnd(&zstrm); |
1350 | 0 | if (zres == Z_DATA_ERROR) { |
1351 | 0 | return Status::InvalidArgument("Fail to do deflateEnd on ZLib stream, error={}, res={}", |
1352 | 0 | zError(zres), zres); |
1353 | 0 | } else if (zres != Z_OK) { |
1354 | 0 | return Status::InternalError("Fail to do deflateEnd on ZLib stream, error={}, res={}", |
1355 | 0 | zError(zres), zres); |
1356 | 0 | } |
1357 | 0 | return Status::OK(); |
1358 | 0 | } |
1359 | | |
1360 | 0 | Status decompress(const Slice& input, Slice* output) override { |
1361 | 0 | z_stream z_strm = {}; |
1362 | 0 | z_strm.zalloc = Z_NULL; |
1363 | 0 | z_strm.zfree = Z_NULL; |
1364 | 0 | z_strm.opaque = Z_NULL; |
1365 | |
|
1366 | 0 | int ret = inflateInit2(&z_strm, MAX_WBITS + GZIP_CODEC); |
1367 | 0 | if (ret != Z_OK) { |
1368 | 0 | return Status::InternalError("Fail to init ZLib decompress, error={}, res={}", |
1369 | 0 | zError(ret), ret); |
1370 | 0 | } |
1371 | | |
1372 | | // 1. set input and output |
1373 | 0 | z_strm.next_in = reinterpret_cast<Bytef*>(input.data); |
1374 | 0 | z_strm.avail_in = cast_set<decltype(z_strm.avail_in)>(input.size); |
1375 | 0 | z_strm.next_out = reinterpret_cast<Bytef*>(output->data); |
1376 | 0 | z_strm.avail_out = cast_set<decltype(z_strm.avail_out)>(output->size); |
1377 | |
|
1378 | 0 | if (z_strm.avail_out > 0) { |
1379 | | // We only support non-streaming use case for block decompressor |
1380 | 0 | ret = inflate(&z_strm, Z_FINISH); |
1381 | 0 | if (ret != Z_OK && ret != Z_STREAM_END) { |
1382 | 0 | (void)inflateEnd(&z_strm); |
1383 | 0 | if (ret == Z_MEM_ERROR) { |
1384 | 0 | throw Exception(Status::MemoryLimitExceeded( |
1385 | 0 | "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret)); |
1386 | 0 | } else if (ret == Z_DATA_ERROR) { |
1387 | 0 | return Status::InvalidArgument( |
1388 | 0 | "Fail to do ZLib stream compress, error={}, res={}", zError(ret), ret); |
1389 | 0 | } |
1390 | 0 | return Status::InternalError("Fail to do ZLib stream compress, error={}, res={}", |
1391 | 0 | zError(ret), ret); |
1392 | 0 | } |
1393 | 0 | } |
1394 | 0 | (void)inflateEnd(&z_strm); |
1395 | |
|
1396 | 0 | return Status::OK(); |
1397 | 0 | } |
1398 | | |
1399 | 0 | size_t max_compressed_len(size_t len) override { |
1400 | 0 | z_stream zstrm; |
1401 | 0 | zstrm.zalloc = Z_NULL; |
1402 | 0 | zstrm.zfree = Z_NULL; |
1403 | 0 | zstrm.opaque = Z_NULL; |
1404 | 0 | auto zres = deflateInit2(&zstrm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + GZIP_CODEC, |
1405 | 0 | MEM_LEVEL, Z_DEFAULT_STRATEGY); |
1406 | 0 | if (zres != Z_OK) { |
1407 | | // Fall back to zlib estimate logic for deflate, notice this may |
1408 | | // cause decompress error |
1409 | 0 | LOG(WARNING) << "Fail to do ZLib stream compress, error=" << zError(zres) |
1410 | 0 | << ", res=" << zres; |
1411 | 0 | return ZlibBlockCompression::max_compressed_len(len); |
1412 | 0 | } else { |
1413 | 0 | zres = deflateEnd(&zstrm); |
1414 | 0 | if (zres != Z_OK) { |
1415 | 0 | LOG(WARNING) << "Fail to do deflateEnd on ZLib stream, error=" << zError(zres) |
1416 | 0 | << ", res=" << zres; |
1417 | 0 | } |
1418 | | // Mark, maintainer of zlib, has stated that 12 needs to be added to |
1419 | | // result for gzip |
1420 | | // http://compgroups.net/comp.unix.programmer/gzip-compressing-an-in-memory-string-usi/54854 |
1421 | | // To have a safe upper bound for "wrapper variations", we add 32 to |
1422 | | // estimate |
1423 | 0 | auto upper_bound = deflateBound(&zstrm, len) + 32; |
1424 | 0 | return upper_bound; |
1425 | 0 | } |
1426 | 0 | } |
1427 | | |
1428 | | private: |
1429 | | // Magic number for zlib, see https://zlib.net/manual.html for more details. |
1430 | | const static int GZIP_CODEC = 16; // gzip |
1431 | | // The memLevel parameter specifies how much memory should be allocated for |
1432 | | // the internal compression state. |
1433 | | const static int MEM_LEVEL = 8; |
1434 | | }; |
1435 | | |
1436 | | // Only used on x86 or x86_64 |
1437 | | #if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \ |
1438 | | defined(__i386) || defined(_M_IX86) |
1439 | | class GzipBlockCompressionByLibdeflate final : public GzipBlockCompression { |
1440 | | public: |
1441 | 0 | GzipBlockCompressionByLibdeflate() : GzipBlockCompression() {} |
1442 | 0 | static GzipBlockCompressionByLibdeflate* instance() { |
1443 | 0 | static GzipBlockCompressionByLibdeflate s_instance; |
1444 | 0 | return &s_instance; |
1445 | 0 | } |
1446 | 0 | ~GzipBlockCompressionByLibdeflate() override = default; |
1447 | | |
1448 | 0 | Status decompress(const Slice& input, Slice* output) override { |
1449 | 0 | if (input.empty()) { |
1450 | 0 | output->size = 0; |
1451 | 0 | return Status::OK(); |
1452 | 0 | } |
1453 | 0 | thread_local std::unique_ptr<libdeflate_decompressor, void (*)(libdeflate_decompressor*)> |
1454 | 0 | decompressor {libdeflate_alloc_decompressor(), libdeflate_free_decompressor}; |
1455 | 0 | if (!decompressor) { |
1456 | 0 | return Status::InternalError("libdeflate_alloc_decompressor error."); |
1457 | 0 | } |
1458 | 0 | std::size_t out_len; |
1459 | 0 | auto result = libdeflate_gzip_decompress(decompressor.get(), input.data, input.size, |
1460 | 0 | output->data, output->size, &out_len); |
1461 | 0 | if (result != LIBDEFLATE_SUCCESS) { |
1462 | 0 | return Status::InternalError("libdeflate_gzip_decompress error, res={}", result); |
1463 | 0 | } |
1464 | 0 | return Status::OK(); |
1465 | 0 | } |
1466 | | }; |
1467 | | #endif |
1468 | | |
1469 | | class LzoBlockCompression final : public BlockCompressionCodec { |
1470 | | public: |
1471 | 0 | static LzoBlockCompression* instance() { |
1472 | 0 | static LzoBlockCompression s_instance; |
1473 | 0 | return &s_instance; |
1474 | 0 | } |
1475 | | |
1476 | 0 | Status compress(const Slice& input, faststring* output) override { |
1477 | 0 | return Status::InvalidArgument("not impl lzo compress."); |
1478 | 0 | } |
1479 | 0 | size_t max_compressed_len(size_t len) override { return 0; }; |
1480 | 0 | Status decompress(const Slice& input, Slice* output) override { |
1481 | 0 | auto* input_ptr = input.data; |
1482 | 0 | auto remain_input_size = input.size; |
1483 | 0 | auto* output_ptr = output->data; |
1484 | 0 | auto remain_output_size = output->size; |
1485 | 0 | auto* output_limit = output->data + output->size; |
1486 | | |
1487 | | // Example: |
1488 | | // OriginData(The original data will be divided into several large data block.) : |
1489 | | // large data block1 | large data block2 | large data block3 | .... |
1490 | | // The large data block will be divided into several small data block. |
1491 | | // Suppose a large data block is divided into three small blocks: |
1492 | | // large data block1: | small block1 | small block2 | small block3 | |
1493 | | // CompressData: <A [B1 compress(small block1) ] [B2 compress(small block1) ] [B3 compress(small block1)]> |
1494 | | // |
1495 | | // A : original length of the current block of large data block. |
1496 | | // sizeof(A) = 4 bytes. |
1497 | | // A = length(small block1) + length(small block2) + length(small block3) |
1498 | | // Bx : length of small data block bx. |
1499 | | // sizeof(Bx) = 4 bytes. |
1500 | | // Bx = length(compress(small blockx)) |
1501 | 0 | try { |
1502 | 0 | while (remain_input_size > 0) { |
1503 | 0 | if (remain_input_size < 4) { |
1504 | 0 | return Status::InvalidArgument( |
1505 | 0 | "Need more input buffer to get large_block_uncompressed_len."); |
1506 | 0 | } |
1507 | | |
1508 | 0 | uint32_t large_block_uncompressed_len = BigEndian::Load32(input_ptr); |
1509 | 0 | input_ptr += 4; |
1510 | 0 | remain_input_size -= 4; |
1511 | |
|
1512 | 0 | if (remain_output_size < large_block_uncompressed_len) { |
1513 | 0 | return Status::InvalidArgument( |
1514 | 0 | "Need more output buffer to get uncompressed data."); |
1515 | 0 | } |
1516 | | |
1517 | 0 | while (large_block_uncompressed_len > 0) { |
1518 | 0 | if (remain_input_size < 4) { |
1519 | 0 | return Status::InvalidArgument( |
1520 | 0 | "Need more input buffer to get small_block_compressed_len."); |
1521 | 0 | } |
1522 | | |
1523 | 0 | uint32_t small_block_compressed_len = BigEndian::Load32(input_ptr); |
1524 | 0 | input_ptr += 4; |
1525 | 0 | remain_input_size -= 4; |
1526 | |
|
1527 | 0 | if (remain_input_size < small_block_compressed_len) { |
1528 | 0 | return Status::InvalidArgument( |
1529 | 0 | "Need more input buffer to decompress small block."); |
1530 | 0 | } |
1531 | | |
1532 | 0 | auto small_block_uncompressed_len = |
1533 | 0 | orc::lzoDecompress(input_ptr, input_ptr + small_block_compressed_len, |
1534 | 0 | output_ptr, output_limit); |
1535 | |
|
1536 | 0 | input_ptr += small_block_compressed_len; |
1537 | 0 | remain_input_size -= small_block_compressed_len; |
1538 | |
|
1539 | 0 | output_ptr += small_block_uncompressed_len; |
1540 | 0 | large_block_uncompressed_len -= small_block_uncompressed_len; |
1541 | 0 | remain_output_size -= small_block_uncompressed_len; |
1542 | 0 | } |
1543 | 0 | } |
1544 | 0 | } catch (const orc::ParseError& e) { |
1545 | | //Prevent be from hanging due to orc::lzoDecompress throw exception |
1546 | 0 | return Status::InternalError("Fail to do LZO decompress, error={}", e.what()); |
1547 | 0 | } |
1548 | 0 | return Status::OK(); |
1549 | 0 | } |
1550 | | }; |
1551 | | |
1552 | | class BrotliBlockCompression final : public BlockCompressionCodec { |
1553 | | public: |
1554 | 0 | static BrotliBlockCompression* instance() { |
1555 | 0 | static BrotliBlockCompression s_instance; |
1556 | 0 | return &s_instance; |
1557 | 0 | } |
1558 | | |
1559 | 0 | Status compress(const Slice& input, faststring* output) override { |
1560 | 0 | return Status::InvalidArgument("not impl brotli compress."); |
1561 | 0 | } |
1562 | | |
1563 | 0 | size_t max_compressed_len(size_t len) override { return 0; }; |
1564 | | |
1565 | 0 | Status decompress(const Slice& input, Slice* output) override { |
1566 | | // The size of output buffer is always equal to the umcompressed length. |
1567 | 0 | BrotliDecoderResult result = BrotliDecoderDecompress( |
1568 | 0 | input.get_size(), reinterpret_cast<const uint8_t*>(input.get_data()), &output->size, |
1569 | 0 | reinterpret_cast<uint8_t*>(output->data)); |
1570 | 0 | if (result != BROTLI_DECODER_RESULT_SUCCESS) { |
1571 | 0 | return Status::InternalError("Brotli decompression failed, result={}", result); |
1572 | 0 | } |
1573 | 0 | return Status::OK(); |
1574 | 0 | } |
1575 | | }; |
1576 | | |
1577 | | Status get_block_compression_codec(segment_v2::CompressionTypePB type, |
1578 | 29.4k | BlockCompressionCodec** codec) { |
1579 | 29.4k | switch (type) { |
1580 | 133 | case segment_v2::CompressionTypePB::NO_COMPRESSION: |
1581 | 133 | *codec = nullptr; |
1582 | 133 | break; |
1583 | 47 | case segment_v2::CompressionTypePB::SNAPPY: |
1584 | 47 | *codec = SnappyBlockCompression::instance(); |
1585 | 47 | break; |
1586 | 368 | case segment_v2::CompressionTypePB::LZ4: |
1587 | 368 | *codec = Lz4BlockCompression::instance(); |
1588 | 368 | break; |
1589 | 28.4k | case segment_v2::CompressionTypePB::LZ4F: |
1590 | 28.4k | *codec = Lz4fBlockCompression::instance(); |
1591 | 28.4k | break; |
1592 | 2 | case segment_v2::CompressionTypePB::LZ4HC: |
1593 | 2 | *codec = Lz4HCBlockCompression::instance(); |
1594 | 2 | break; |
1595 | 2 | case segment_v2::CompressionTypePB::ZLIB: |
1596 | 2 | *codec = ZlibBlockCompression::instance(); |
1597 | 2 | break; |
1598 | 504 | case segment_v2::CompressionTypePB::ZSTD: |
1599 | 504 | *codec = ZstdBlockCompression::instance(); |
1600 | 504 | break; |
1601 | 0 | default: |
1602 | 0 | return Status::InternalError("unknown compression type({})", type); |
1603 | 29.4k | } |
1604 | | |
1605 | 29.4k | return Status::OK(); |
1606 | 29.4k | } |
1607 | | |
1608 | | // this can only be used in hive text write |
1609 | 0 | Status get_block_compression_codec(TFileCompressType::type type, BlockCompressionCodec** codec) { |
1610 | 0 | switch (type) { |
1611 | 0 | case TFileCompressType::PLAIN: |
1612 | 0 | *codec = nullptr; |
1613 | 0 | break; |
1614 | 0 | case TFileCompressType::ZLIB: |
1615 | 0 | *codec = ZlibBlockCompression::instance(); |
1616 | 0 | break; |
1617 | 0 | case TFileCompressType::GZ: |
1618 | 0 | *codec = GzipBlockCompression::instance(); |
1619 | 0 | break; |
1620 | 0 | case TFileCompressType::BZ2: |
1621 | 0 | *codec = Bzip2BlockCompression::instance(); |
1622 | 0 | break; |
1623 | 0 | case TFileCompressType::LZ4BLOCK: |
1624 | 0 | *codec = HadoopLz4BlockCompression::instance(); |
1625 | 0 | break; |
1626 | 0 | case TFileCompressType::SNAPPYBLOCK: |
1627 | 0 | *codec = HadoopSnappyBlockCompression::instance(); |
1628 | 0 | break; |
1629 | 0 | case TFileCompressType::ZSTD: |
1630 | 0 | *codec = ZstdBlockCompression::instance(); |
1631 | 0 | break; |
1632 | 0 | default: |
1633 | 0 | return Status::InternalError("unsupport compression type({}) int hive text", type); |
1634 | 0 | } |
1635 | | |
1636 | 0 | return Status::OK(); |
1637 | 0 | } |
1638 | | |
1639 | | Status get_block_compression_codec(tparquet::CompressionCodec::type parquet_codec, |
1640 | 108 | BlockCompressionCodec** codec) { |
1641 | 108 | switch (parquet_codec) { |
1642 | 0 | case tparquet::CompressionCodec::UNCOMPRESSED: |
1643 | 0 | *codec = nullptr; |
1644 | 0 | break; |
1645 | 108 | case tparquet::CompressionCodec::SNAPPY: |
1646 | 108 | *codec = SnappyBlockCompression::instance(); |
1647 | 108 | break; |
1648 | 0 | case tparquet::CompressionCodec::LZ4_RAW: // we can use LZ4 compression algorithm parse LZ4_RAW |
1649 | 0 | case tparquet::CompressionCodec::LZ4: |
1650 | 0 | *codec = HadoopLz4BlockCompression::instance(); |
1651 | 0 | break; |
1652 | 0 | case tparquet::CompressionCodec::ZSTD: |
1653 | 0 | *codec = ZstdBlockCompression::instance(); |
1654 | 0 | break; |
1655 | 0 | case tparquet::CompressionCodec::GZIP: |
1656 | | // Only used on x86 or x86_64 |
1657 | 0 | #if defined(__x86_64__) || defined(_M_X64) || defined(i386) || defined(__i386__) || \ |
1658 | 0 | defined(__i386) || defined(_M_IX86) |
1659 | 0 | *codec = GzipBlockCompressionByLibdeflate::instance(); |
1660 | | #else |
1661 | | *codec = GzipBlockCompression::instance(); |
1662 | | #endif |
1663 | 0 | break; |
1664 | 0 | case tparquet::CompressionCodec::LZO: |
1665 | 0 | *codec = LzoBlockCompression::instance(); |
1666 | 0 | break; |
1667 | 0 | case tparquet::CompressionCodec::BROTLI: |
1668 | 0 | *codec = BrotliBlockCompression::instance(); |
1669 | 0 | break; |
1670 | 0 | default: |
1671 | 0 | return Status::InternalError("unknown compression type({})", parquet_codec); |
1672 | 108 | } |
1673 | | |
1674 | 108 | return Status::OK(); |
1675 | 108 | } |
1676 | | |
1677 | | #include "common/compile_check_end.h" |
1678 | | } // namespace doris |