be/src/format/parquet/level_decoder.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format/parquet/level_decoder.h" |
19 | | |
20 | | #include <gen_cpp/parquet_types.h> |
21 | | |
22 | | #include <algorithm> |
23 | | |
24 | | #include "format/parquet/parquet_common.h" |
25 | | #include "util/bit_stream_utils.inline.h" |
26 | | #include "util/bit_util.h" |
27 | | #include "util/coding.h" |
28 | | |
29 | | static constexpr size_t V1_LEVEL_SIZE = 4; |
30 | | #include "common/cast_set.h" |
31 | | #include "common/compile_check_begin.h" |
32 | | |
33 | | doris::Status doris::LevelDecoder::init(doris::Slice* slice, tparquet::Encoding::type encoding, |
34 | 938k | doris::level_t max_level, uint32_t num_levels) { |
35 | 938k | _encoding = encoding; |
36 | 938k | _bit_width = cast_set<level_t>(BitUtil::log2(max_level + 1)); |
37 | 938k | _max_level = max_level; |
38 | 938k | _num_levels = num_levels; |
39 | 938k | switch (encoding) { |
40 | 938k | case tparquet::Encoding::RLE: { |
41 | 938k | if (slice->size < V1_LEVEL_SIZE) { |
42 | 0 | return Status::Corruption("Wrong parquet level format"); |
43 | 0 | } |
44 | | |
45 | 938k | uint8_t* data = (uint8_t*)slice->data; |
46 | 938k | uint32_t num_bytes = decode_fixed32_le(data); |
47 | 938k | if (num_bytes > slice->size - V1_LEVEL_SIZE) { |
48 | 1 | return Status::Corruption("Wrong parquet level format"); |
49 | 1 | } |
50 | 938k | _rle_decoder = RleDecoder<level_t>(data + V1_LEVEL_SIZE, num_bytes, _bit_width); |
51 | | |
52 | 938k | slice->data += V1_LEVEL_SIZE + num_bytes; |
53 | 938k | slice->size -= V1_LEVEL_SIZE + num_bytes; |
54 | 938k | break; |
55 | 938k | } |
56 | 1 | case tparquet::Encoding::BIT_PACKED: { |
57 | 1 | uint32_t num_bits = num_levels * _bit_width; |
58 | 1 | uint32_t num_bytes = BitUtil::RoundUpNumBytes(num_bits); |
59 | 1 | if (num_bytes > slice->size) { |
60 | 0 | return Status::Corruption("Wrong parquet level format"); |
61 | 0 | } |
62 | 1 | _bit_packed_decoder = BitReader((uint8_t*)slice->data, num_bytes); |
63 | | |
64 | 1 | slice->data += num_bytes; |
65 | 1 | slice->size -= num_bytes; |
66 | 1 | break; |
67 | 1 | } |
68 | 1 | default: |
69 | 1 | return Status::IOError("Unsupported encoding for parquet level"); |
70 | 938k | } |
71 | 938k | return Status::OK(); |
72 | 938k | } |
73 | | |
74 | | doris::Status doris::LevelDecoder::init_v2(const doris::Slice& levels, doris::level_t max_level, |
75 | 14.0k | uint32_t num_levels) { |
76 | 14.0k | _encoding = tparquet::Encoding::RLE; |
77 | 14.0k | _bit_width = cast_set<level_t>(BitUtil::log2(max_level + 1)); |
78 | 14.0k | _max_level = max_level; |
79 | 14.0k | _num_levels = num_levels; |
80 | 14.0k | size_t byte_length = levels.size; |
81 | 14.0k | _rle_decoder = |
82 | 14.0k | RleDecoder<level_t>((uint8_t*)levels.data, cast_set<int>(byte_length), _bit_width); |
83 | 14.0k | return Status::OK(); |
84 | 14.0k | } |
85 | | |
86 | 175k | size_t doris::LevelDecoder::get_levels(doris::level_t* levels, size_t n) { |
87 | | // toto template. |
88 | 175k | if (_encoding == tparquet::Encoding::RLE) { |
89 | 175k | n = std::min((size_t)_num_levels, n); |
90 | 175k | auto num_decoded = _rle_decoder.get_values(levels, n); |
91 | 175k | _num_levels -= num_decoded; |
92 | 175k | return num_decoded; |
93 | 175k | } else if (_encoding == tparquet::Encoding::BIT_PACKED) { |
94 | 1 | n = std::min((size_t)_num_levels, n); |
95 | 4 | for (size_t i = 0; i < n; ++i) { |
96 | 3 | if (!_bit_packed_decoder.GetValue(_bit_width, &levels[i])) { |
97 | 0 | throw doris::Exception(ErrorCode::INTERNAL_ERROR, |
98 | 0 | "Failed to decode BIT_PACKED levels"); |
99 | 0 | } |
100 | 3 | } |
101 | 1 | _num_levels -= n; |
102 | 1 | return n; |
103 | 1 | } |
104 | 0 | return 0; |
105 | 175k | } |
106 | | #include "common/compile_check_end.h" |