be/src/format/parquet/decoder.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/parquet_types.h> |
21 | | #include <glog/logging.h> |
22 | | |
23 | | #include <cstddef> |
24 | | #include <cstdint> |
25 | | #include <memory> |
26 | | #include <ostream> |
27 | | #include <vector> |
28 | | |
29 | | #include "common/status.h" |
30 | | #include "core/assert_cast.h" |
31 | | #include "core/column/column.h" |
32 | | #include "core/column/column_dictionary.h" |
33 | | #include "core/column/column_vector.h" |
34 | | #include "core/custom_allocator.h" |
35 | | #include "core/data_type/data_type.h" |
36 | | #include "core/data_type/data_type_decimal.h" // IWYU pragma: keep |
37 | | #include "core/data_type/data_type_nullable.h" |
38 | | #include "core/pod_array_fwd.h" |
39 | | #include "core/types.h" |
40 | | #include "format/parquet/parquet_common.h" |
41 | | #include "util/rle_encoding.h" |
42 | | #include "util/slice.h" |
43 | | |
44 | | namespace doris { |
45 | | #include "common/compile_check_begin.h" |
46 | | template <typename T> |
47 | | class ColumnStr; |
48 | | using ColumnString = ColumnStr<UInt32>; |
49 | | |
50 | | class Decoder { |
51 | | public: |
52 | 1.31k | Decoder() = default; |
53 | 1.30k | virtual ~Decoder() = default; |
54 | | |
55 | | static Status get_decoder(tparquet::Type::type type, tparquet::Encoding::type encoding, |
56 | | std::unique_ptr<Decoder>& decoder); |
57 | | |
58 | | // The type with fix length |
59 | 1.20k | void set_type_length(int32_t type_length) { _type_length = type_length; } |
60 | | |
61 | | // Set the data to be decoded |
62 | 258 | virtual Status set_data(Slice* data) { |
63 | 258 | _data = data; |
64 | 258 | _offset = 0; |
65 | 258 | return Status::OK(); |
66 | 258 | } |
67 | | |
68 | | // Write the decoded values batch to doris's column |
69 | | virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, |
70 | | ColumnSelectVector& select_vector, bool is_dict_filter) = 0; |
71 | | |
72 | | virtual Status skip_values(size_t num_values) = 0; |
73 | | |
74 | | virtual Status set_dict(DorisUniqueBufferPtr<uint8_t>& dict, int32_t length, |
75 | 0 | size_t num_values) { |
76 | 0 | return Status::NotSupported("set_dict is not supported"); |
77 | 0 | } |
78 | | |
79 | 0 | virtual Status read_dict_values_to_column(MutableColumnPtr& doris_column) { |
80 | 0 | return Status::NotSupported("read_dict_values_to_column is not supported"); |
81 | 0 | } |
82 | | |
83 | | virtual Result<MutableColumnPtr> convert_dict_column_to_string_column( |
84 | 0 | const ColumnInt32* dict_column) { |
85 | 0 | throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, |
86 | 0 | "Method convert_dict_column_to_string_column is not supported"); |
87 | 0 | } |
88 | | |
89 | | protected: |
90 | | int32_t _type_length; |
91 | | Slice* _data = nullptr; |
92 | | uint32_t _offset = 0; |
93 | | }; |
94 | | |
95 | | class BaseDictDecoder : public Decoder { |
96 | | public: |
97 | 924 | BaseDictDecoder() = default; |
98 | 924 | ~BaseDictDecoder() override = default; |
99 | | |
100 | | // Set the data to be decoded |
101 | 974 | Status set_data(Slice* data) override { |
102 | 974 | _data = data; |
103 | 974 | _offset = 0; |
104 | 974 | uint8_t bit_width = *data->data; |
105 | 974 | _index_batch_decoder = std::make_unique<RleBatchDecoder<uint32_t>>( |
106 | 974 | reinterpret_cast<uint8_t*>(data->data) + 1, static_cast<int>(data->size) - 1, |
107 | 974 | bit_width); |
108 | 974 | return Status::OK(); |
109 | 974 | } |
110 | | |
111 | | protected: |
112 | | /** |
113 | | * Decode dictionary-coded values into doris_column, ensure that doris_column is ColumnDictI32 type, |
114 | | * and the coded values must be read into _indexes previously. |
115 | | */ |
116 | | template <bool has_filter> |
117 | | Status _decode_dict_values(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector, |
118 | 12 | bool is_dict_filter) { |
119 | 12 | DCHECK(doris_column->is_column_dictionary() || is_dict_filter); |
120 | 12 | size_t dict_index = 0; |
121 | 12 | ColumnSelectVector::DataReadType read_type; |
122 | 12 | PaddedPODArray<Int32>& column_data = |
123 | 12 | doris_column->is_column_dictionary() |
124 | 12 | ? assert_cast<ColumnDictI32&>(*doris_column).get_data() |
125 | 12 | : assert_cast<ColumnInt32&>(*doris_column).get_data(); |
126 | 64 | while (size_t run_length = select_vector.get_next_run<has_filter>(&read_type)) { |
127 | 52 | switch (read_type) { |
128 | 28 | case ColumnSelectVector::CONTENT: { |
129 | 28 | uint32_t* start_index = _indexes.data(); |
130 | 28 | column_data.insert(start_index + dict_index, start_index + dict_index + run_length); |
131 | 28 | dict_index += run_length; |
132 | 28 | break; |
133 | 0 | } |
134 | 8 | case ColumnSelectVector::NULL_DATA: { |
135 | 8 | doris_column->insert_many_defaults(run_length); |
136 | 8 | break; |
137 | 0 | } |
138 | 16 | case ColumnSelectVector::FILTERED_CONTENT: { |
139 | 16 | dict_index += run_length; |
140 | 16 | break; |
141 | 0 | } |
142 | 0 | case ColumnSelectVector::FILTERED_NULL: { |
143 | 0 | break; |
144 | 0 | } |
145 | 52 | } |
146 | 52 | } |
147 | 12 | return Status::OK(); |
148 | 12 | } _ZN5doris15BaseDictDecoder19_decode_dict_valuesILb1EEENS_6StatusERNS_3COWINS_7IColumnEE11mutable_ptrIS4_EERNS_18ColumnSelectVectorEb Line | Count | Source | 118 | 8 | bool is_dict_filter) { | 119 | 8 | DCHECK(doris_column->is_column_dictionary() || is_dict_filter); | 120 | 8 | size_t dict_index = 0; | 121 | 8 | ColumnSelectVector::DataReadType read_type; | 122 | 8 | PaddedPODArray<Int32>& column_data = | 123 | 8 | doris_column->is_column_dictionary() | 124 | 8 | ? assert_cast<ColumnDictI32&>(*doris_column).get_data() | 125 | 8 | : assert_cast<ColumnInt32&>(*doris_column).get_data(); | 126 | 56 | while (size_t run_length = select_vector.get_next_run<has_filter>(&read_type)) { | 127 | 48 | switch (read_type) { | 128 | 24 | case ColumnSelectVector::CONTENT: { | 129 | 24 | uint32_t* start_index = _indexes.data(); | 130 | 24 | column_data.insert(start_index + dict_index, start_index + dict_index + run_length); | 131 | 24 | dict_index += run_length; | 132 | 24 | break; | 133 | 0 | } | 134 | 8 | case ColumnSelectVector::NULL_DATA: { | 135 | 8 | doris_column->insert_many_defaults(run_length); | 136 | 8 | break; | 137 | 0 | } | 138 | 16 | case ColumnSelectVector::FILTERED_CONTENT: { | 139 | 16 | dict_index += run_length; | 140 | 16 | break; | 141 | 0 | } | 142 | 0 | case ColumnSelectVector::FILTERED_NULL: { | 143 | 0 | break; | 144 | 0 | } | 145 | 48 | } | 146 | 48 | } | 147 | 8 | return Status::OK(); | 148 | 8 | } |
_ZN5doris15BaseDictDecoder19_decode_dict_valuesILb0EEENS_6StatusERNS_3COWINS_7IColumnEE11mutable_ptrIS4_EERNS_18ColumnSelectVectorEb Line | Count | Source | 118 | 4 | bool is_dict_filter) { | 119 | 4 | DCHECK(doris_column->is_column_dictionary() || is_dict_filter); | 120 | 4 | size_t dict_index = 0; | 121 | 4 | ColumnSelectVector::DataReadType read_type; | 122 | 4 | PaddedPODArray<Int32>& column_data = | 123 | 4 | doris_column->is_column_dictionary() | 124 | 4 | ? assert_cast<ColumnDictI32&>(*doris_column).get_data() | 125 | 4 | : assert_cast<ColumnInt32&>(*doris_column).get_data(); | 126 | 8 | while (size_t run_length = select_vector.get_next_run<has_filter>(&read_type)) { | 127 | 4 | switch (read_type) { | 128 | 4 | case ColumnSelectVector::CONTENT: { | 129 | 4 | uint32_t* start_index = _indexes.data(); | 130 | 4 | column_data.insert(start_index + dict_index, start_index + dict_index + run_length); | 131 | 4 | dict_index += run_length; | 132 | 4 | break; | 133 | 0 | } | 134 | 0 | case ColumnSelectVector::NULL_DATA: { | 135 | 0 | doris_column->insert_many_defaults(run_length); | 136 | 0 | break; | 137 | 0 | } | 138 | 0 | case ColumnSelectVector::FILTERED_CONTENT: { | 139 | 0 | dict_index += run_length; | 140 | 0 | break; | 141 | 0 | } | 142 | 0 | case ColumnSelectVector::FILTERED_NULL: { | 143 | 0 | break; | 144 | 0 | } | 145 | 4 | } | 146 | 4 | } | 147 | 4 | return Status::OK(); | 148 | 4 | } |
|
149 | | |
150 | 270 | Status skip_values(size_t num_values) override { |
151 | 270 | _indexes.resize(num_values); |
152 | 270 | _index_batch_decoder->GetBatch(_indexes.data(), cast_set<uint32_t>(num_values)); |
153 | 270 | return Status::OK(); |
154 | 270 | } |
155 | | |
156 | | // For dictionary encoding |
157 | | DorisUniqueBufferPtr<uint8_t> _dict; |
158 | | std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder; |
159 | | std::vector<uint32_t> _indexes; |
160 | | }; |
161 | | #include "common/compile_check_end.h" |
162 | | |
163 | | } // namespace doris |