be/src/format/parquet/parquet_column_convert.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format/parquet/parquet_column_convert.h" |
19 | | |
20 | | #include <cctz/time_zone.h> |
21 | | #include <glog/logging.h> |
22 | | |
23 | | #include "common/cast_set.h" |
24 | | #include "core/column/column_fixed_length_object.h" |
25 | | #include "core/column/column_nullable.h" |
26 | | #include "core/data_type/data_type_fixed_length_object.h" |
27 | | #include "core/data_type/data_type_nullable.h" |
28 | | #include "core/data_type/define_primitive_type.h" |
29 | | #include "core/data_type/primitive_type.h" |
30 | | |
31 | | namespace doris::parquet { |
32 | | const cctz::time_zone ConvertParams::utc0 = cctz::utc_time_zone(); |
33 | | |
34 | | #define FOR_LOGICAL_DECIMAL_TYPES(M) \ |
35 | 0 | M(TYPE_DECIMAL32) \ |
36 | 4 | M(TYPE_DECIMAL64) \ |
37 | 4 | M(TYPE_DECIMAL128I) \ |
38 | 0 | M(TYPE_DECIMAL256) |
39 | | |
40 | 223 | bool PhysicalToLogicalConverter::is_parquet_native_type(PrimitiveType type) { |
41 | 223 | switch (type) { |
42 | 12 | case TYPE_BOOLEAN: |
43 | 81 | case TYPE_INT: |
44 | 105 | case TYPE_BIGINT: |
45 | 135 | case TYPE_FLOAT: |
46 | 154 | case TYPE_DOUBLE: |
47 | 187 | case TYPE_STRING: |
48 | 187 | case TYPE_CHAR: |
49 | 187 | case TYPE_VARCHAR: |
50 | 187 | return true; |
51 | 36 | default: |
52 | 36 | return false; |
53 | 223 | } |
54 | 223 | } |
55 | | |
56 | 19 | bool PhysicalToLogicalConverter::is_decimal_type(doris::PrimitiveType type) { |
57 | 19 | switch (type) { |
58 | 0 | case TYPE_DECIMAL32: |
59 | 4 | case TYPE_DECIMAL64: |
60 | 4 | case TYPE_DECIMAL128I: |
61 | 4 | case TYPE_DECIMAL256: |
62 | 4 | case TYPE_DECIMALV2: |
63 | 4 | return true; |
64 | 15 | default: |
65 | 15 | return false; |
66 | 19 | } |
67 | 19 | } |
68 | | |
69 | | ColumnPtr PhysicalToLogicalConverter::get_physical_column(tparquet::Type::type src_physical_type, |
70 | | DataTypePtr src_logical_type, |
71 | | ColumnPtr& dst_logical_column, |
72 | | const DataTypePtr& dst_logical_type, |
73 | 319 | bool is_dict_filter) { |
74 | 319 | if (is_dict_filter) { |
75 | 0 | src_physical_type = tparquet::Type::INT32; |
76 | 0 | src_logical_type = DataTypeFactory::instance().create_data_type( |
77 | 0 | PrimitiveType::TYPE_INT, dst_logical_type->is_nullable()); |
78 | 0 | } |
79 | | |
80 | 319 | if (!_convert_params->is_type_compatibility && is_consistent() && |
81 | 319 | _logical_converter->is_consistent()) { |
82 | 274 | if (_cached_src_physical_type == nullptr) { |
83 | 112 | _cached_src_physical_type = dst_logical_type->is_nullable() |
84 | 112 | ? make_nullable(src_logical_type) |
85 | 112 | : remove_nullable(src_logical_type); |
86 | 112 | } |
87 | 274 | return dst_logical_column; |
88 | 274 | } |
89 | | |
90 | 45 | if (!_cached_src_physical_column) { |
91 | 34 | switch (src_physical_type) { |
92 | 0 | case tparquet::Type::type::BOOLEAN: |
93 | 0 | _cached_src_physical_type = std::make_shared<DataTypeUInt8>(); |
94 | 0 | break; |
95 | 23 | case tparquet::Type::type::INT32: |
96 | 23 | _cached_src_physical_type = std::make_shared<DataTypeInt32>(); |
97 | 23 | break; |
98 | 6 | case tparquet::Type::type::INT64: |
99 | 6 | _cached_src_physical_type = std::make_shared<DataTypeInt64>(); |
100 | 6 | break; |
101 | 0 | case tparquet::Type::type::FLOAT: |
102 | 0 | _cached_src_physical_type = std::make_shared<DataTypeFloat32>(); |
103 | 0 | break; |
104 | 0 | case tparquet::Type::type::DOUBLE: |
105 | 0 | _cached_src_physical_type = std::make_shared<DataTypeFloat64>(); |
106 | 0 | break; |
107 | 4 | case tparquet::Type::type::BYTE_ARRAY: |
108 | 4 | _cached_src_physical_type = std::make_shared<DataTypeString>(); |
109 | 4 | break; |
110 | 1 | case tparquet::Type::type::FIXED_LEN_BYTE_ARRAY: |
111 | 1 | _cached_src_physical_type = std::make_shared<DataTypeFixedLengthObject>(); |
112 | 1 | break; |
113 | 0 | case tparquet::Type::type::INT96: |
114 | 0 | _cached_src_physical_type = std::make_shared<DataTypeInt8>(); |
115 | 0 | break; |
116 | 34 | } |
117 | 34 | const bool is_fixed_length_byte_array = |
118 | 34 | src_physical_type == tparquet::Type::type::FIXED_LEN_BYTE_ARRAY; |
119 | 34 | if (dst_logical_type->is_nullable()) { |
120 | 34 | MutableColumnPtr nested_physical_column; |
121 | 34 | if (is_fixed_length_byte_array) { |
122 | 1 | nested_physical_column = ColumnFixedLengthObject::create( |
123 | 1 | _convert_params->field_schema->parquet_schema.type_length); |
124 | 33 | } else { |
125 | 33 | nested_physical_column = _cached_src_physical_type->create_column(); |
126 | 33 | } |
127 | 34 | _cached_src_physical_column = ColumnNullable::create(std::move(nested_physical_column), |
128 | 34 | ColumnUInt8::create()); |
129 | 34 | _cached_src_physical_type = make_nullable(_cached_src_physical_type); |
130 | 34 | } else { |
131 | 0 | if (is_fixed_length_byte_array) { |
132 | 0 | _cached_src_physical_column = ColumnFixedLengthObject::create( |
133 | 0 | _convert_params->field_schema->parquet_schema.type_length); |
134 | 0 | } else { |
135 | 0 | _cached_src_physical_column = _cached_src_physical_type->create_column(); |
136 | 0 | } |
137 | 0 | } |
138 | 34 | } |
139 | | // remove the old cached data |
140 | 45 | auto cached_src_physical_column = IColumn::mutate(std::move(_cached_src_physical_column)); |
141 | 45 | cached_src_physical_column->clear(); |
142 | 45 | _cached_src_physical_column = std::move(cached_src_physical_column); |
143 | | |
144 | 45 | return _cached_src_physical_column; |
145 | 45 | } |
146 | | |
147 | | static void get_decimal_converter(const FieldSchema* field_schema, DataTypePtr src_logical_type, |
148 | | const DataTypePtr& dst_logical_type, |
149 | | ConvertParams* convert_params, |
150 | 4 | std::unique_ptr<PhysicalToLogicalConverter>& physical_converter) { |
151 | 4 | const tparquet::SchemaElement& parquet_schema = field_schema->parquet_schema; |
152 | 4 | if (is_decimal(dst_logical_type->get_primitive_type())) { |
153 | 4 | src_logical_type = create_decimal(parquet_schema.precision, parquet_schema.scale, false); |
154 | 4 | } |
155 | | |
156 | 4 | tparquet::Type::type src_physical_type = parquet_schema.type; |
157 | 4 | PrimitiveType src_logical_primitive = src_logical_type->get_primitive_type(); |
158 | | |
159 | 4 | if (src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { |
160 | 2 | switch (src_logical_primitive) { |
161 | 0 | #define DISPATCH(LOGICAL_PTYPE) \ |
162 | 2 | case LOGICAL_PTYPE: { \ |
163 | 2 | physical_converter.reset( \ |
164 | 2 | new FixedSizeToDecimal<LOGICAL_PTYPE>(parquet_schema.type_length)); \ |
165 | 2 | break; \ |
166 | 2 | } |
167 | 2 | FOR_LOGICAL_DECIMAL_TYPES(DISPATCH) |
168 | 0 | #undef DISPATCH |
169 | 0 | default: |
170 | 0 | physical_converter = |
171 | 0 | std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type); |
172 | 2 | } |
173 | 2 | } else if (src_physical_type == tparquet::Type::BYTE_ARRAY) { |
174 | 0 | switch (src_logical_primitive) { |
175 | 0 | #define DISPATCH(LOGICAL_PTYPE) \ |
176 | 0 | case LOGICAL_PTYPE: { \ |
177 | 0 | physical_converter.reset(new StringToDecimal<LOGICAL_PTYPE>()); \ |
178 | 0 | break; \ |
179 | 0 | } |
180 | 0 | FOR_LOGICAL_DECIMAL_TYPES(DISPATCH) |
181 | 0 | #undef DISPATCH |
182 | 0 | default: |
183 | 0 | physical_converter = |
184 | 0 | std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type); |
185 | 0 | } |
186 | 2 | } else if (src_physical_type == tparquet::Type::INT32 || |
187 | 2 | src_physical_type == tparquet::Type::INT64) { |
188 | 2 | switch (src_logical_primitive) { |
189 | 0 | #define DISPATCH(LOGICAL_PTYPE) \ |
190 | 2 | case LOGICAL_PTYPE: { \ |
191 | 2 | if (src_physical_type == tparquet::Type::INT32) { \ |
192 | 0 | physical_converter.reset(new NumberToDecimal<TYPE_INT, LOGICAL_PTYPE>()); \ |
193 | 2 | } else { \ |
194 | 2 | physical_converter.reset(new NumberToDecimal<TYPE_BIGINT, LOGICAL_PTYPE>()); \ |
195 | 2 | } \ |
196 | 2 | break; \ |
197 | 2 | } |
198 | 2 | FOR_LOGICAL_DECIMAL_TYPES(DISPATCH) |
199 | 0 | #undef DISPATCH |
200 | 0 | default: |
201 | 0 | physical_converter = |
202 | 0 | std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type); |
203 | 2 | } |
204 | 2 | } else { |
205 | 0 | physical_converter = |
206 | 0 | std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type); |
207 | 0 | } |
208 | 4 | } |
209 | | |
210 | | std::unique_ptr<PhysicalToLogicalConverter> PhysicalToLogicalConverter::get_converter( |
211 | | const FieldSchema* field_schema, DataTypePtr src_logical_type, |
212 | 223 | const DataTypePtr& dst_logical_type, const cctz::time_zone* ctz, bool is_dict_filter) { |
213 | 223 | std::unique_ptr<ConvertParams> convert_params = std::make_unique<ConvertParams>(); |
214 | 223 | const tparquet::SchemaElement& parquet_schema = field_schema->parquet_schema; |
215 | 223 | convert_params->init(field_schema, ctz); |
216 | 223 | tparquet::Type::type src_physical_type = parquet_schema.type; |
217 | 223 | std::unique_ptr<PhysicalToLogicalConverter> physical_converter; |
218 | 223 | if (is_dict_filter) { |
219 | 0 | src_physical_type = tparquet::Type::INT32; |
220 | 0 | src_logical_type = DataTypeFactory::instance().create_data_type( |
221 | 0 | PrimitiveType::TYPE_INT, dst_logical_type->is_nullable()); |
222 | 0 | } |
223 | 223 | PrimitiveType src_logical_primitive = src_logical_type->get_primitive_type(); |
224 | | |
225 | 223 | if (field_schema->is_type_compatibility) { |
226 | 0 | if (src_logical_primitive == TYPE_SMALLINT) { |
227 | 0 | physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_SMALLINT>>(); |
228 | 0 | } else if (src_logical_primitive == TYPE_INT) { |
229 | 0 | physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_INT>>(); |
230 | 0 | } else if (src_logical_primitive == TYPE_BIGINT) { |
231 | 0 | physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_BIGINT>>(); |
232 | 0 | } else if (src_logical_primitive == TYPE_LARGEINT) { |
233 | 0 | physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_LARGEINT>>(); |
234 | 0 | } else { |
235 | 0 | physical_converter = |
236 | 0 | std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type); |
237 | 0 | } |
238 | 223 | } else if (is_parquet_native_type(src_logical_primitive)) { |
239 | 187 | bool is_string_logical_type = is_string_type(src_logical_primitive); |
240 | 187 | if (is_string_logical_type && src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { |
241 | | // for FixedSizeBinary |
242 | 1 | physical_converter = |
243 | 1 | std::make_unique<FixedSizeBinaryConverter>(parquet_schema.type_length); |
244 | 186 | } else if (src_logical_primitive == TYPE_FLOAT && |
245 | 186 | src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY && |
246 | 186 | parquet_schema.logicalType.__isset.FLOAT16) { |
247 | 0 | physical_converter = |
248 | 0 | std::make_unique<Float16PhysicalConverter>(parquet_schema.type_length); |
249 | 186 | } else { |
250 | 186 | physical_converter = std::make_unique<ConsistentPhysicalConverter>(); |
251 | 186 | } |
252 | 187 | } else if (src_logical_primitive == TYPE_TINYINT) { |
253 | 10 | physical_converter = std::make_unique<LittleIntPhysicalConverter<TYPE_TINYINT>>(); |
254 | 26 | } else if (src_logical_primitive == TYPE_SMALLINT) { |
255 | 7 | physical_converter = std::make_unique<LittleIntPhysicalConverter<TYPE_SMALLINT>>(); |
256 | 19 | } else if (is_decimal_type(src_logical_primitive)) { |
257 | 4 | get_decimal_converter(field_schema, src_logical_type, dst_logical_type, |
258 | 4 | convert_params.get(), physical_converter); |
259 | 15 | } else if (src_logical_primitive == TYPE_DATEV2) { |
260 | 7 | physical_converter = std::make_unique<Int32ToDate>(); |
261 | 8 | } else if (src_logical_primitive == TYPE_DATETIMEV2) { |
262 | 5 | if (src_physical_type == tparquet::Type::INT96) { |
263 | | // int96 only stores nanoseconds in standard parquet file |
264 | 0 | convert_params->reset_time_scale_if_missing(9); |
265 | 0 | physical_converter = std::make_unique<Int96toTimestamp>(); |
266 | 5 | } else if (src_physical_type == tparquet::Type::INT64) { |
267 | 5 | convert_params->reset_time_scale_if_missing(src_logical_type->get_scale()); |
268 | 5 | physical_converter = std::make_unique<Int64ToTimestamp>(); |
269 | 5 | } else { |
270 | 0 | physical_converter = |
271 | 0 | std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type); |
272 | 0 | } |
273 | 5 | } else if (src_logical_primitive == TYPE_VARBINARY) { |
274 | 3 | if (src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { |
275 | 1 | DCHECK(parquet_schema.logicalType.__isset.UUID) << parquet_schema.name; |
276 | 1 | physical_converter = |
277 | 1 | std::make_unique<UUIDVarBinaryConverter>(parquet_schema.type_length); |
278 | 2 | } else { |
279 | 2 | DCHECK(src_physical_type == tparquet::Type::BYTE_ARRAY) << src_physical_type; |
280 | 2 | physical_converter = std::make_unique<ConsistentPhysicalConverter>(); |
281 | 2 | } |
282 | 3 | } else if (src_logical_primitive == TYPE_TIMESTAMPTZ) { |
283 | 0 | if (src_physical_type == tparquet::Type::INT96) { |
284 | 0 | physical_converter = std::make_unique<Int96toTimestampTz>(); |
285 | 0 | } else if (src_physical_type == tparquet::Type::INT64) { |
286 | 0 | DCHECK(src_physical_type == tparquet::Type::INT64) << src_physical_type; |
287 | 0 | DCHECK(parquet_schema.logicalType.__isset.TIMESTAMP) << parquet_schema.name; |
288 | 0 | physical_converter = std::make_unique<Int64ToTimestampTz>(); |
289 | 0 | } else { |
290 | 0 | physical_converter = |
291 | 0 | std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type); |
292 | 0 | } |
293 | 0 | } else { |
294 | 0 | physical_converter = |
295 | 0 | std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type); |
296 | 0 | } |
297 | | |
298 | 223 | if (physical_converter->support()) { |
299 | 223 | physical_converter->_convert_params = std::move(convert_params); |
300 | 223 | physical_converter->_logical_converter = converter::ColumnTypeConverter::get_converter( |
301 | 223 | src_logical_type, dst_logical_type, converter::FileFormat::PARQUET); |
302 | 223 | if (!physical_converter->_logical_converter->support()) { |
303 | 0 | physical_converter = std::make_unique<UnsupportedConverter>( |
304 | 0 | "Unsupported type change: " + |
305 | 0 | physical_converter->_logical_converter->get_error_msg()); |
306 | 0 | } |
307 | 223 | } |
308 | 223 | return physical_converter; |
309 | 223 | } |
310 | | |
311 | | } // namespace doris::parquet |