Coverage Report

Created: 2026-05-18 03:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/parquet_column_convert.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/parquet/parquet_column_convert.h"
19
20
#include <cctz/time_zone.h>
21
#include <glog/logging.h>
22
23
#include "common/cast_set.h"
24
#include "core/column/column_fixed_length_object.h"
25
#include "core/column/column_nullable.h"
26
#include "core/data_type/data_type_fixed_length_object.h"
27
#include "core/data_type/data_type_nullable.h"
28
#include "core/data_type/define_primitive_type.h"
29
#include "core/data_type/primitive_type.h"
30
31
namespace doris::parquet {
32
const cctz::time_zone ConvertParams::utc0 = cctz::utc_time_zone();
33
34
#define FOR_LOGICAL_DECIMAL_TYPES(M) \
35
0
    M(TYPE_DECIMAL32)                \
36
4
    M(TYPE_DECIMAL64)                \
37
4
    M(TYPE_DECIMAL128I)              \
38
0
    M(TYPE_DECIMAL256)
39
40
223
bool PhysicalToLogicalConverter::is_parquet_native_type(PrimitiveType type) {
41
223
    switch (type) {
42
12
    case TYPE_BOOLEAN:
43
81
    case TYPE_INT:
44
105
    case TYPE_BIGINT:
45
135
    case TYPE_FLOAT:
46
154
    case TYPE_DOUBLE:
47
187
    case TYPE_STRING:
48
187
    case TYPE_CHAR:
49
187
    case TYPE_VARCHAR:
50
187
        return true;
51
36
    default:
52
36
        return false;
53
223
    }
54
223
}
55
56
19
bool PhysicalToLogicalConverter::is_decimal_type(doris::PrimitiveType type) {
57
19
    switch (type) {
58
0
    case TYPE_DECIMAL32:
59
4
    case TYPE_DECIMAL64:
60
4
    case TYPE_DECIMAL128I:
61
4
    case TYPE_DECIMAL256:
62
4
    case TYPE_DECIMALV2:
63
4
        return true;
64
15
    default:
65
15
        return false;
66
19
    }
67
19
}
68
69
ColumnPtr PhysicalToLogicalConverter::get_physical_column(tparquet::Type::type src_physical_type,
70
                                                          DataTypePtr src_logical_type,
71
                                                          ColumnPtr& dst_logical_column,
72
                                                          const DataTypePtr& dst_logical_type,
73
319
                                                          bool is_dict_filter) {
74
319
    if (is_dict_filter) {
75
0
        src_physical_type = tparquet::Type::INT32;
76
0
        src_logical_type = DataTypeFactory::instance().create_data_type(
77
0
                PrimitiveType::TYPE_INT, dst_logical_type->is_nullable());
78
0
    }
79
80
319
    if (!_convert_params->is_type_compatibility && is_consistent() &&
81
319
        _logical_converter->is_consistent()) {
82
274
        if (_cached_src_physical_type == nullptr) {
83
112
            _cached_src_physical_type = dst_logical_type->is_nullable()
84
112
                                                ? make_nullable(src_logical_type)
85
112
                                                : remove_nullable(src_logical_type);
86
112
        }
87
274
        return dst_logical_column;
88
274
    }
89
90
45
    if (!_cached_src_physical_column) {
91
34
        switch (src_physical_type) {
92
0
        case tparquet::Type::type::BOOLEAN:
93
0
            _cached_src_physical_type = std::make_shared<DataTypeUInt8>();
94
0
            break;
95
23
        case tparquet::Type::type::INT32:
96
23
            _cached_src_physical_type = std::make_shared<DataTypeInt32>();
97
23
            break;
98
6
        case tparquet::Type::type::INT64:
99
6
            _cached_src_physical_type = std::make_shared<DataTypeInt64>();
100
6
            break;
101
0
        case tparquet::Type::type::FLOAT:
102
0
            _cached_src_physical_type = std::make_shared<DataTypeFloat32>();
103
0
            break;
104
0
        case tparquet::Type::type::DOUBLE:
105
0
            _cached_src_physical_type = std::make_shared<DataTypeFloat64>();
106
0
            break;
107
4
        case tparquet::Type::type::BYTE_ARRAY:
108
4
            _cached_src_physical_type = std::make_shared<DataTypeString>();
109
4
            break;
110
1
        case tparquet::Type::type::FIXED_LEN_BYTE_ARRAY:
111
1
            _cached_src_physical_type = std::make_shared<DataTypeFixedLengthObject>();
112
1
            break;
113
0
        case tparquet::Type::type::INT96:
114
0
            _cached_src_physical_type = std::make_shared<DataTypeInt8>();
115
0
            break;
116
34
        }
117
34
        const bool is_fixed_length_byte_array =
118
34
                src_physical_type == tparquet::Type::type::FIXED_LEN_BYTE_ARRAY;
119
34
        if (dst_logical_type->is_nullable()) {
120
34
            MutableColumnPtr nested_physical_column;
121
34
            if (is_fixed_length_byte_array) {
122
1
                nested_physical_column = ColumnFixedLengthObject::create(
123
1
                        _convert_params->field_schema->parquet_schema.type_length);
124
33
            } else {
125
33
                nested_physical_column = _cached_src_physical_type->create_column();
126
33
            }
127
34
            _cached_src_physical_column = ColumnNullable::create(std::move(nested_physical_column),
128
34
                                                                 ColumnUInt8::create());
129
34
            _cached_src_physical_type = make_nullable(_cached_src_physical_type);
130
34
        } else {
131
0
            if (is_fixed_length_byte_array) {
132
0
                _cached_src_physical_column = ColumnFixedLengthObject::create(
133
0
                        _convert_params->field_schema->parquet_schema.type_length);
134
0
            } else {
135
0
                _cached_src_physical_column = _cached_src_physical_type->create_column();
136
0
            }
137
0
        }
138
34
    }
139
    // remove the old cached data
140
45
    auto cached_src_physical_column = IColumn::mutate(std::move(_cached_src_physical_column));
141
45
    cached_src_physical_column->clear();
142
45
    _cached_src_physical_column = std::move(cached_src_physical_column);
143
144
45
    return _cached_src_physical_column;
145
45
}
146
147
static void get_decimal_converter(const FieldSchema* field_schema, DataTypePtr src_logical_type,
148
                                  const DataTypePtr& dst_logical_type,
149
                                  ConvertParams* convert_params,
150
4
                                  std::unique_ptr<PhysicalToLogicalConverter>& physical_converter) {
151
4
    const tparquet::SchemaElement& parquet_schema = field_schema->parquet_schema;
152
4
    if (is_decimal(dst_logical_type->get_primitive_type())) {
153
4
        src_logical_type = create_decimal(parquet_schema.precision, parquet_schema.scale, false);
154
4
    }
155
156
4
    tparquet::Type::type src_physical_type = parquet_schema.type;
157
4
    PrimitiveType src_logical_primitive = src_logical_type->get_primitive_type();
158
159
4
    if (src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
160
2
        switch (src_logical_primitive) {
161
0
#define DISPATCH(LOGICAL_PTYPE)                                                     \
162
2
    case LOGICAL_PTYPE: {                                                           \
163
2
        physical_converter.reset(                                                   \
164
2
                new FixedSizeToDecimal<LOGICAL_PTYPE>(parquet_schema.type_length)); \
165
2
        break;                                                                      \
166
2
    }
167
2
            FOR_LOGICAL_DECIMAL_TYPES(DISPATCH)
168
0
#undef DISPATCH
169
0
        default:
170
0
            physical_converter =
171
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
172
2
        }
173
2
    } else if (src_physical_type == tparquet::Type::BYTE_ARRAY) {
174
0
        switch (src_logical_primitive) {
175
0
#define DISPATCH(LOGICAL_PTYPE)                                         \
176
0
    case LOGICAL_PTYPE: {                                               \
177
0
        physical_converter.reset(new StringToDecimal<LOGICAL_PTYPE>()); \
178
0
        break;                                                          \
179
0
    }
180
0
            FOR_LOGICAL_DECIMAL_TYPES(DISPATCH)
181
0
#undef DISPATCH
182
0
        default:
183
0
            physical_converter =
184
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
185
0
        }
186
2
    } else if (src_physical_type == tparquet::Type::INT32 ||
187
2
               src_physical_type == tparquet::Type::INT64) {
188
2
        switch (src_logical_primitive) {
189
0
#define DISPATCH(LOGICAL_PTYPE)                                                          \
190
2
    case LOGICAL_PTYPE: {                                                                \
191
2
        if (src_physical_type == tparquet::Type::INT32) {                                \
192
0
            physical_converter.reset(new NumberToDecimal<TYPE_INT, LOGICAL_PTYPE>());    \
193
2
        } else {                                                                         \
194
2
            physical_converter.reset(new NumberToDecimal<TYPE_BIGINT, LOGICAL_PTYPE>()); \
195
2
        }                                                                                \
196
2
        break;                                                                           \
197
2
    }
198
2
            FOR_LOGICAL_DECIMAL_TYPES(DISPATCH)
199
0
#undef DISPATCH
200
0
        default:
201
0
            physical_converter =
202
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
203
2
        }
204
2
    } else {
205
0
        physical_converter =
206
0
                std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
207
0
    }
208
4
}
209
210
std::unique_ptr<PhysicalToLogicalConverter> PhysicalToLogicalConverter::get_converter(
211
        const FieldSchema* field_schema, DataTypePtr src_logical_type,
212
223
        const DataTypePtr& dst_logical_type, const cctz::time_zone* ctz, bool is_dict_filter) {
213
223
    std::unique_ptr<ConvertParams> convert_params = std::make_unique<ConvertParams>();
214
223
    const tparquet::SchemaElement& parquet_schema = field_schema->parquet_schema;
215
223
    convert_params->init(field_schema, ctz);
216
223
    tparquet::Type::type src_physical_type = parquet_schema.type;
217
223
    std::unique_ptr<PhysicalToLogicalConverter> physical_converter;
218
223
    if (is_dict_filter) {
219
0
        src_physical_type = tparquet::Type::INT32;
220
0
        src_logical_type = DataTypeFactory::instance().create_data_type(
221
0
                PrimitiveType::TYPE_INT, dst_logical_type->is_nullable());
222
0
    }
223
223
    PrimitiveType src_logical_primitive = src_logical_type->get_primitive_type();
224
225
223
    if (field_schema->is_type_compatibility) {
226
0
        if (src_logical_primitive == TYPE_SMALLINT) {
227
0
            physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_SMALLINT>>();
228
0
        } else if (src_logical_primitive == TYPE_INT) {
229
0
            physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_INT>>();
230
0
        } else if (src_logical_primitive == TYPE_BIGINT) {
231
0
            physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_BIGINT>>();
232
0
        } else if (src_logical_primitive == TYPE_LARGEINT) {
233
0
            physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_LARGEINT>>();
234
0
        } else {
235
0
            physical_converter =
236
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
237
0
        }
238
223
    } else if (is_parquet_native_type(src_logical_primitive)) {
239
187
        bool is_string_logical_type = is_string_type(src_logical_primitive);
240
187
        if (is_string_logical_type && src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
241
            // for FixedSizeBinary
242
1
            physical_converter =
243
1
                    std::make_unique<FixedSizeBinaryConverter>(parquet_schema.type_length);
244
186
        } else if (src_logical_primitive == TYPE_FLOAT &&
245
186
                   src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY &&
246
186
                   parquet_schema.logicalType.__isset.FLOAT16) {
247
0
            physical_converter =
248
0
                    std::make_unique<Float16PhysicalConverter>(parquet_schema.type_length);
249
186
        } else {
250
186
            physical_converter = std::make_unique<ConsistentPhysicalConverter>();
251
186
        }
252
187
    } else if (src_logical_primitive == TYPE_TINYINT) {
253
10
        physical_converter = std::make_unique<LittleIntPhysicalConverter<TYPE_TINYINT>>();
254
26
    } else if (src_logical_primitive == TYPE_SMALLINT) {
255
7
        physical_converter = std::make_unique<LittleIntPhysicalConverter<TYPE_SMALLINT>>();
256
19
    } else if (is_decimal_type(src_logical_primitive)) {
257
4
        get_decimal_converter(field_schema, src_logical_type, dst_logical_type,
258
4
                              convert_params.get(), physical_converter);
259
15
    } else if (src_logical_primitive == TYPE_DATEV2) {
260
7
        physical_converter = std::make_unique<Int32ToDate>();
261
8
    } else if (src_logical_primitive == TYPE_DATETIMEV2) {
262
5
        if (src_physical_type == tparquet::Type::INT96) {
263
            // int96 only stores nanoseconds in standard parquet file
264
0
            convert_params->reset_time_scale_if_missing(9);
265
0
            physical_converter = std::make_unique<Int96toTimestamp>();
266
5
        } else if (src_physical_type == tparquet::Type::INT64) {
267
5
            convert_params->reset_time_scale_if_missing(src_logical_type->get_scale());
268
5
            physical_converter = std::make_unique<Int64ToTimestamp>();
269
5
        } else {
270
0
            physical_converter =
271
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
272
0
        }
273
5
    } else if (src_logical_primitive == TYPE_VARBINARY) {
274
3
        if (src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
275
1
            DCHECK(parquet_schema.logicalType.__isset.UUID) << parquet_schema.name;
276
1
            physical_converter =
277
1
                    std::make_unique<UUIDVarBinaryConverter>(parquet_schema.type_length);
278
2
        } else {
279
2
            DCHECK(src_physical_type == tparquet::Type::BYTE_ARRAY) << src_physical_type;
280
2
            physical_converter = std::make_unique<ConsistentPhysicalConverter>();
281
2
        }
282
3
    } else if (src_logical_primitive == TYPE_TIMESTAMPTZ) {
283
0
        if (src_physical_type == tparquet::Type::INT96) {
284
0
            physical_converter = std::make_unique<Int96toTimestampTz>();
285
0
        } else if (src_physical_type == tparquet::Type::INT64) {
286
0
            DCHECK(src_physical_type == tparquet::Type::INT64) << src_physical_type;
287
0
            DCHECK(parquet_schema.logicalType.__isset.TIMESTAMP) << parquet_schema.name;
288
0
            physical_converter = std::make_unique<Int64ToTimestampTz>();
289
0
        } else {
290
0
            physical_converter =
291
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
292
0
        }
293
0
    } else {
294
0
        physical_converter =
295
0
                std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
296
0
    }
297
298
223
    if (physical_converter->support()) {
299
223
        physical_converter->_convert_params = std::move(convert_params);
300
223
        physical_converter->_logical_converter = converter::ColumnTypeConverter::get_converter(
301
223
                src_logical_type, dst_logical_type, converter::FileFormat::PARQUET);
302
223
        if (!physical_converter->_logical_converter->support()) {
303
0
            physical_converter = std::make_unique<UnsupportedConverter>(
304
0
                    "Unsupported type change: " +
305
0
                    physical_converter->_logical_converter->get_error_msg());
306
0
        }
307
223
    }
308
223
    return physical_converter;
309
223
}
310
311
} // namespace doris::parquet