be/src/format/parquet/parquet_column_convert.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format/parquet/parquet_column_convert.h" |
19 | | |
20 | | #include <cctz/time_zone.h> |
21 | | #include <glog/logging.h> |
22 | | |
23 | | #include "common/cast_set.h" |
24 | | #include "core/column/column_nullable.h" |
25 | | #include "core/data_type/data_type_nullable.h" |
26 | | #include "core/data_type/define_primitive_type.h" |
27 | | #include "core/data_type/primitive_type.h" |
28 | | |
29 | | namespace doris::parquet { |
30 | | #include "common/compile_check_begin.h" |
31 | | const cctz::time_zone ConvertParams::utc0 = cctz::utc_time_zone(); |
32 | | |
33 | | #define FOR_LOGICAL_DECIMAL_TYPES(M) \ |
34 | 0 | M(TYPE_DECIMAL32) \ |
35 | 4 | M(TYPE_DECIMAL64) \ |
36 | 4 | M(TYPE_DECIMAL128I) \ |
37 | 0 | M(TYPE_DECIMAL256) |
38 | | |
39 | 212 | bool PhysicalToLogicalConverter::is_parquet_native_type(PrimitiveType type) { |
40 | 212 | switch (type) { |
41 | 12 | case TYPE_BOOLEAN: |
42 | 81 | case TYPE_INT: |
43 | 104 | case TYPE_BIGINT: |
44 | 133 | case TYPE_FLOAT: |
45 | 152 | case TYPE_DOUBLE: |
46 | 178 | case TYPE_STRING: |
47 | 178 | case TYPE_CHAR: |
48 | 178 | case TYPE_VARCHAR: |
49 | 178 | return true; |
50 | 34 | default: |
51 | 34 | return false; |
52 | 212 | } |
53 | 212 | } |
54 | | |
55 | 17 | bool PhysicalToLogicalConverter::is_decimal_type(doris::PrimitiveType type) { |
56 | 17 | switch (type) { |
57 | 0 | case TYPE_DECIMAL32: |
58 | 4 | case TYPE_DECIMAL64: |
59 | 4 | case TYPE_DECIMAL128I: |
60 | 4 | case TYPE_DECIMAL256: |
61 | 4 | case TYPE_DECIMALV2: |
62 | 4 | return true; |
63 | 13 | default: |
64 | 13 | return false; |
65 | 17 | } |
66 | 17 | } |
67 | | |
68 | | ColumnPtr PhysicalToLogicalConverter::get_physical_column(tparquet::Type::type src_physical_type, |
69 | | DataTypePtr src_logical_type, |
70 | | ColumnPtr& dst_logical_column, |
71 | | const DataTypePtr& dst_logical_type, |
72 | 222 | bool is_dict_filter) { |
73 | 222 | if (is_dict_filter) { |
74 | 0 | src_physical_type = tparquet::Type::INT32; |
75 | 0 | src_logical_type = DataTypeFactory::instance().create_data_type( |
76 | 0 | PrimitiveType::TYPE_INT, dst_logical_type->is_nullable()); |
77 | 0 | } |
78 | | |
79 | 222 | if (!_convert_params->is_type_compatibility && is_consistent() && |
80 | 222 | _logical_converter->is_consistent()) { |
81 | 179 | if (_cached_src_physical_type == nullptr) { |
82 | 105 | _cached_src_physical_type = dst_logical_type->is_nullable() |
83 | 105 | ? make_nullable(src_logical_type) |
84 | 105 | : remove_nullable(src_logical_type); |
85 | 105 | } |
86 | 179 | return dst_logical_column; |
87 | 179 | } |
88 | | |
89 | 43 | if (!_cached_src_physical_column) { |
90 | 32 | switch (src_physical_type) { |
91 | 0 | case tparquet::Type::type::BOOLEAN: |
92 | 0 | _cached_src_physical_type = std::make_shared<DataTypeUInt8>(); |
93 | 0 | break; |
94 | 21 | case tparquet::Type::type::INT32: |
95 | 21 | _cached_src_physical_type = std::make_shared<DataTypeInt32>(); |
96 | 21 | break; |
97 | 6 | case tparquet::Type::type::INT64: |
98 | 6 | _cached_src_physical_type = std::make_shared<DataTypeInt64>(); |
99 | 6 | break; |
100 | 0 | case tparquet::Type::type::FLOAT: |
101 | 0 | _cached_src_physical_type = std::make_shared<DataTypeFloat32>(); |
102 | 0 | break; |
103 | 0 | case tparquet::Type::type::DOUBLE: |
104 | 0 | _cached_src_physical_type = std::make_shared<DataTypeFloat64>(); |
105 | 0 | break; |
106 | 4 | case tparquet::Type::type::BYTE_ARRAY: |
107 | 4 | _cached_src_physical_type = std::make_shared<DataTypeString>(); |
108 | 4 | break; |
109 | 1 | case tparquet::Type::type::FIXED_LEN_BYTE_ARRAY: |
110 | 1 | _cached_src_physical_type = std::make_shared<DataTypeUInt8>(); |
111 | 1 | break; |
112 | 0 | case tparquet::Type::type::INT96: |
113 | 0 | _cached_src_physical_type = std::make_shared<DataTypeInt8>(); |
114 | 0 | break; |
115 | 32 | } |
116 | 32 | _cached_src_physical_column = _cached_src_physical_type->create_column(); |
117 | 32 | if (dst_logical_type->is_nullable()) { |
118 | 32 | _cached_src_physical_type = make_nullable(_cached_src_physical_type); |
119 | 32 | } |
120 | 32 | } |
121 | | // remove the old cached data |
122 | 43 | _cached_src_physical_column->assume_mutable()->clear(); |
123 | | |
124 | 43 | if (dst_logical_type->is_nullable()) { |
125 | | // In order to share null map between parquet converted src column and dst column to avoid copying. It is very tricky that will |
126 | | // call mutable function `doris_nullable_column->get_null_map_column_ptr()` which will set `_need_update_has_null = true`. |
127 | | // Because some operations such as agg will call `has_null()` to set `_need_update_has_null = false`. |
128 | 43 | auto* doris_nullable_column = assert_cast<const ColumnNullable*>(dst_logical_column.get()); |
129 | 43 | return ColumnNullable::create(_cached_src_physical_column, |
130 | 43 | doris_nullable_column->get_null_map_column_ptr()); |
131 | 43 | } |
132 | | |
133 | 0 | return _cached_src_physical_column; |
134 | 43 | } |
135 | | |
136 | | static void get_decimal_converter(const FieldSchema* field_schema, DataTypePtr src_logical_type, |
137 | | const DataTypePtr& dst_logical_type, |
138 | | ConvertParams* convert_params, |
139 | 4 | std::unique_ptr<PhysicalToLogicalConverter>& physical_converter) { |
140 | 4 | const tparquet::SchemaElement& parquet_schema = field_schema->parquet_schema; |
141 | 4 | if (is_decimal(dst_logical_type->get_primitive_type())) { |
142 | 4 | src_logical_type = create_decimal(parquet_schema.precision, parquet_schema.scale, false); |
143 | 4 | } |
144 | | |
145 | 4 | tparquet::Type::type src_physical_type = parquet_schema.type; |
146 | 4 | PrimitiveType src_logical_primitive = src_logical_type->get_primitive_type(); |
147 | | |
148 | 4 | if (src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { |
149 | 2 | switch (src_logical_primitive) { |
150 | 0 | #define DISPATCH(LOGICAL_PTYPE) \ |
151 | 2 | case LOGICAL_PTYPE: { \ |
152 | 2 | physical_converter.reset( \ |
153 | 2 | new FixedSizeToDecimal<LOGICAL_PTYPE>(parquet_schema.type_length)); \ |
154 | 2 | break; \ |
155 | 2 | } |
156 | 2 | FOR_LOGICAL_DECIMAL_TYPES(DISPATCH) |
157 | 0 | #undef DISPATCH |
158 | 0 | default: |
159 | 0 | physical_converter = |
160 | 0 | std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type); |
161 | 2 | } |
162 | 2 | } else if (src_physical_type == tparquet::Type::BYTE_ARRAY) { |
163 | 0 | switch (src_logical_primitive) { |
164 | 0 | #define DISPATCH(LOGICAL_PTYPE) \ |
165 | 0 | case LOGICAL_PTYPE: { \ |
166 | 0 | physical_converter.reset(new StringToDecimal<LOGICAL_PTYPE>()); \ |
167 | 0 | break; \ |
168 | 0 | } |
169 | 0 | FOR_LOGICAL_DECIMAL_TYPES(DISPATCH) |
170 | 0 | #undef DISPATCH |
171 | 0 | default: |
172 | 0 | physical_converter = |
173 | 0 | std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type); |
174 | 0 | } |
175 | 2 | } else if (src_physical_type == tparquet::Type::INT32 || |
176 | 2 | src_physical_type == tparquet::Type::INT64) { |
177 | 2 | switch (src_logical_primitive) { |
178 | 0 | #define DISPATCH(LOGICAL_PTYPE) \ |
179 | 2 | case LOGICAL_PTYPE: { \ |
180 | 2 | if (src_physical_type == tparquet::Type::INT32) { \ |
181 | 0 | physical_converter.reset(new NumberToDecimal<TYPE_INT, LOGICAL_PTYPE>()); \ |
182 | 2 | } else { \ |
183 | 2 | physical_converter.reset(new NumberToDecimal<TYPE_BIGINT, LOGICAL_PTYPE>()); \ |
184 | 2 | } \ |
185 | 2 | break; \ |
186 | 2 | } |
187 | 2 | FOR_LOGICAL_DECIMAL_TYPES(DISPATCH) |
188 | 0 | #undef DISPATCH |
189 | 0 | default: |
190 | 0 | physical_converter = |
191 | 0 | std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type); |
192 | 2 | } |
193 | 2 | } else { |
194 | 0 | physical_converter = |
195 | 0 | std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type); |
196 | 0 | } |
197 | 4 | } |
198 | | |
199 | | std::unique_ptr<PhysicalToLogicalConverter> PhysicalToLogicalConverter::get_converter( |
200 | | const FieldSchema* field_schema, DataTypePtr src_logical_type, |
201 | 212 | const DataTypePtr& dst_logical_type, const cctz::time_zone* ctz, bool is_dict_filter) { |
202 | 212 | std::unique_ptr<ConvertParams> convert_params = std::make_unique<ConvertParams>(); |
203 | 212 | const tparquet::SchemaElement& parquet_schema = field_schema->parquet_schema; |
204 | 212 | convert_params->init(field_schema, ctz); |
205 | 212 | tparquet::Type::type src_physical_type = parquet_schema.type; |
206 | 212 | std::unique_ptr<PhysicalToLogicalConverter> physical_converter; |
207 | 212 | if (is_dict_filter) { |
208 | 0 | src_physical_type = tparquet::Type::INT32; |
209 | 0 | src_logical_type = DataTypeFactory::instance().create_data_type( |
210 | 0 | PrimitiveType::TYPE_INT, dst_logical_type->is_nullable()); |
211 | 0 | } |
212 | 212 | PrimitiveType src_logical_primitive = src_logical_type->get_primitive_type(); |
213 | | |
214 | 212 | if (field_schema->is_type_compatibility) { |
215 | 0 | if (src_logical_primitive == TYPE_SMALLINT) { |
216 | 0 | physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_SMALLINT>>(); |
217 | 0 | } else if (src_logical_primitive == TYPE_INT) { |
218 | 0 | physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_INT>>(); |
219 | 0 | } else if (src_logical_primitive == TYPE_BIGINT) { |
220 | 0 | physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_BIGINT>>(); |
221 | 0 | } else if (src_logical_primitive == TYPE_LARGEINT) { |
222 | 0 | physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_LARGEINT>>(); |
223 | 0 | } else { |
224 | 0 | physical_converter = |
225 | 0 | std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type); |
226 | 0 | } |
227 | 212 | } else if (is_parquet_native_type(src_logical_primitive)) { |
228 | 178 | bool is_string_logical_type = is_string_type(src_logical_primitive); |
229 | 178 | if (is_string_logical_type && src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { |
230 | | // for FixedSizeBinary |
231 | 0 | physical_converter = |
232 | 0 | std::make_unique<FixedSizeBinaryConverter>(parquet_schema.type_length); |
233 | 178 | } else if (src_logical_primitive == TYPE_FLOAT && |
234 | 178 | src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY && |
235 | 178 | parquet_schema.logicalType.__isset.FLOAT16) { |
236 | 0 | physical_converter = |
237 | 0 | std::make_unique<Float16PhysicalConverter>(parquet_schema.type_length); |
238 | 178 | } else { |
239 | 178 | physical_converter = std::make_unique<ConsistentPhysicalConverter>(); |
240 | 178 | } |
241 | 178 | } else if (src_logical_primitive == TYPE_TINYINT) { |
242 | 10 | physical_converter = std::make_unique<LittleIntPhysicalConverter<TYPE_TINYINT>>(); |
243 | 24 | } else if (src_logical_primitive == TYPE_SMALLINT) { |
244 | 7 | physical_converter = std::make_unique<LittleIntPhysicalConverter<TYPE_SMALLINT>>(); |
245 | 17 | } else if (is_decimal_type(src_logical_primitive)) { |
246 | 4 | get_decimal_converter(field_schema, src_logical_type, dst_logical_type, |
247 | 4 | convert_params.get(), physical_converter); |
248 | 13 | } else if (src_logical_primitive == TYPE_DATEV2) { |
249 | 5 | physical_converter = std::make_unique<Int32ToDate>(); |
250 | 8 | } else if (src_logical_primitive == TYPE_DATETIMEV2) { |
251 | 5 | if (src_physical_type == tparquet::Type::INT96) { |
252 | | // int96 only stores nanoseconds in standard parquet file |
253 | 0 | convert_params->reset_time_scale_if_missing(9); |
254 | 0 | physical_converter = std::make_unique<Int96toTimestamp>(); |
255 | 5 | } else if (src_physical_type == tparquet::Type::INT64) { |
256 | 5 | convert_params->reset_time_scale_if_missing(src_logical_type->get_scale()); |
257 | 5 | physical_converter = std::make_unique<Int64ToTimestamp>(); |
258 | 5 | } else { |
259 | 0 | physical_converter = |
260 | 0 | std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type); |
261 | 0 | } |
262 | 5 | } else if (src_logical_primitive == TYPE_VARBINARY) { |
263 | 3 | if (src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { |
264 | 1 | DCHECK(parquet_schema.logicalType.__isset.UUID) << parquet_schema.name; |
265 | 1 | physical_converter = |
266 | 1 | std::make_unique<UUIDVarBinaryConverter>(parquet_schema.type_length); |
267 | 2 | } else { |
268 | 2 | DCHECK(src_physical_type == tparquet::Type::BYTE_ARRAY) << src_physical_type; |
269 | 2 | physical_converter = std::make_unique<ConsistentPhysicalConverter>(); |
270 | 2 | } |
271 | 3 | } else if (src_logical_primitive == TYPE_TIMESTAMPTZ) { |
272 | 0 | if (src_physical_type == tparquet::Type::INT96) { |
273 | 0 | physical_converter = std::make_unique<Int96toTimestampTz>(); |
274 | 0 | } else if (src_physical_type == tparquet::Type::INT64) { |
275 | 0 | DCHECK(src_physical_type == tparquet::Type::INT64) << src_physical_type; |
276 | 0 | DCHECK(parquet_schema.logicalType.__isset.TIMESTAMP) << parquet_schema.name; |
277 | 0 | physical_converter = std::make_unique<Int64ToTimestampTz>(); |
278 | 0 | } else { |
279 | 0 | physical_converter = |
280 | 0 | std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type); |
281 | 0 | } |
282 | 0 | } else { |
283 | 0 | physical_converter = |
284 | 0 | std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type); |
285 | 0 | } |
286 | | |
287 | 212 | if (physical_converter->support()) { |
288 | 212 | physical_converter->_convert_params = std::move(convert_params); |
289 | 212 | physical_converter->_logical_converter = converter::ColumnTypeConverter::get_converter( |
290 | 212 | src_logical_type, dst_logical_type, converter::FileFormat::PARQUET); |
291 | 212 | if (!physical_converter->_logical_converter->support()) { |
292 | 0 | physical_converter = std::make_unique<UnsupportedConverter>( |
293 | 0 | "Unsupported type change: " + |
294 | 0 | physical_converter->_logical_converter->get_error_msg()); |
295 | 0 | } |
296 | 212 | } |
297 | 212 | return physical_converter; |
298 | 212 | } |
299 | | #include "common/compile_check_end.h" |
300 | | |
301 | | } // namespace doris::parquet |