Coverage Report

Created: 2026-04-10 04:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/parquet_column_convert.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/parquet/parquet_column_convert.h"
19
20
#include <cctz/time_zone.h>
21
#include <glog/logging.h>
22
23
#include "common/cast_set.h"
24
#include "core/column/column_nullable.h"
25
#include "core/data_type/data_type_nullable.h"
26
#include "core/data_type/define_primitive_type.h"
27
#include "core/data_type/primitive_type.h"
28
29
namespace doris::parquet {
30
const cctz::time_zone ConvertParams::utc0 = cctz::utc_time_zone();
31
32
#define FOR_LOGICAL_DECIMAL_TYPES(M) \
33
0
    M(TYPE_DECIMAL32)                \
34
4
    M(TYPE_DECIMAL64)                \
35
4
    M(TYPE_DECIMAL128I)              \
36
0
    M(TYPE_DECIMAL256)
37
38
216
bool PhysicalToLogicalConverter::is_parquet_native_type(PrimitiveType type) {
39
216
    switch (type) {
40
12
    case TYPE_BOOLEAN:
41
81
    case TYPE_INT:
42
105
    case TYPE_BIGINT:
43
134
    case TYPE_FLOAT:
44
153
    case TYPE_DOUBLE:
45
180
    case TYPE_STRING:
46
180
    case TYPE_CHAR:
47
180
    case TYPE_VARCHAR:
48
180
        return true;
49
36
    default:
50
36
        return false;
51
216
    }
52
216
}
53
54
19
bool PhysicalToLogicalConverter::is_decimal_type(doris::PrimitiveType type) {
55
19
    switch (type) {
56
0
    case TYPE_DECIMAL32:
57
4
    case TYPE_DECIMAL64:
58
4
    case TYPE_DECIMAL128I:
59
4
    case TYPE_DECIMAL256:
60
4
    case TYPE_DECIMALV2:
61
4
        return true;
62
15
    default:
63
15
        return false;
64
19
    }
65
19
}
66
67
ColumnPtr PhysicalToLogicalConverter::get_physical_column(tparquet::Type::type src_physical_type,
68
                                                          DataTypePtr src_logical_type,
69
                                                          ColumnPtr& dst_logical_column,
70
                                                          const DataTypePtr& dst_logical_type,
71
226
                                                          bool is_dict_filter) {
72
226
    if (is_dict_filter) {
73
0
        src_physical_type = tparquet::Type::INT32;
74
0
        src_logical_type = DataTypeFactory::instance().create_data_type(
75
0
                PrimitiveType::TYPE_INT, dst_logical_type->is_nullable());
76
0
    }
77
78
226
    if (!_convert_params->is_type_compatibility && is_consistent() &&
79
226
        _logical_converter->is_consistent()) {
80
181
        if (_cached_src_physical_type == nullptr) {
81
107
            _cached_src_physical_type = dst_logical_type->is_nullable()
82
107
                                                ? make_nullable(src_logical_type)
83
107
                                                : remove_nullable(src_logical_type);
84
107
        }
85
181
        return dst_logical_column;
86
181
    }
87
88
45
    if (!_cached_src_physical_column) {
89
34
        switch (src_physical_type) {
90
0
        case tparquet::Type::type::BOOLEAN:
91
0
            _cached_src_physical_type = std::make_shared<DataTypeUInt8>();
92
0
            break;
93
23
        case tparquet::Type::type::INT32:
94
23
            _cached_src_physical_type = std::make_shared<DataTypeInt32>();
95
23
            break;
96
6
        case tparquet::Type::type::INT64:
97
6
            _cached_src_physical_type = std::make_shared<DataTypeInt64>();
98
6
            break;
99
0
        case tparquet::Type::type::FLOAT:
100
0
            _cached_src_physical_type = std::make_shared<DataTypeFloat32>();
101
0
            break;
102
0
        case tparquet::Type::type::DOUBLE:
103
0
            _cached_src_physical_type = std::make_shared<DataTypeFloat64>();
104
0
            break;
105
4
        case tparquet::Type::type::BYTE_ARRAY:
106
4
            _cached_src_physical_type = std::make_shared<DataTypeString>();
107
4
            break;
108
1
        case tparquet::Type::type::FIXED_LEN_BYTE_ARRAY:
109
1
            _cached_src_physical_type = std::make_shared<DataTypeUInt8>();
110
1
            break;
111
0
        case tparquet::Type::type::INT96:
112
0
            _cached_src_physical_type = std::make_shared<DataTypeInt8>();
113
0
            break;
114
34
        }
115
34
        _cached_src_physical_column = _cached_src_physical_type->create_column();
116
34
        if (dst_logical_type->is_nullable()) {
117
34
            _cached_src_physical_type = make_nullable(_cached_src_physical_type);
118
34
        }
119
34
    }
120
    // remove the old cached data
121
45
    _cached_src_physical_column->assume_mutable()->clear();
122
123
45
    if (dst_logical_type->is_nullable()) {
124
        // In order to share null map between parquet converted src column and dst column to avoid copying. It is very tricky that will
125
        // call mutable function `doris_nullable_column->get_null_map_column_ptr()` which will set `_need_update_has_null = true`.
126
        // Because some operations such as agg will call `has_null()` to set `_need_update_has_null = false`.
127
45
        auto* doris_nullable_column = assert_cast<const ColumnNullable*>(dst_logical_column.get());
128
45
        return ColumnNullable::create(_cached_src_physical_column,
129
45
                                      doris_nullable_column->get_null_map_column_ptr());
130
45
    }
131
132
0
    return _cached_src_physical_column;
133
45
}
134
135
static void get_decimal_converter(const FieldSchema* field_schema, DataTypePtr src_logical_type,
136
                                  const DataTypePtr& dst_logical_type,
137
                                  ConvertParams* convert_params,
138
4
                                  std::unique_ptr<PhysicalToLogicalConverter>& physical_converter) {
139
4
    const tparquet::SchemaElement& parquet_schema = field_schema->parquet_schema;
140
4
    if (is_decimal(dst_logical_type->get_primitive_type())) {
141
4
        src_logical_type = create_decimal(parquet_schema.precision, parquet_schema.scale, false);
142
4
    }
143
144
4
    tparquet::Type::type src_physical_type = parquet_schema.type;
145
4
    PrimitiveType src_logical_primitive = src_logical_type->get_primitive_type();
146
147
4
    if (src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
148
2
        switch (src_logical_primitive) {
149
0
#define DISPATCH(LOGICAL_PTYPE)                                                     \
150
2
    case LOGICAL_PTYPE: {                                                           \
151
2
        physical_converter.reset(                                                   \
152
2
                new FixedSizeToDecimal<LOGICAL_PTYPE>(parquet_schema.type_length)); \
153
2
        break;                                                                      \
154
2
    }
155
2
            FOR_LOGICAL_DECIMAL_TYPES(DISPATCH)
156
0
#undef DISPATCH
157
0
        default:
158
0
            physical_converter =
159
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
160
2
        }
161
2
    } else if (src_physical_type == tparquet::Type::BYTE_ARRAY) {
162
0
        switch (src_logical_primitive) {
163
0
#define DISPATCH(LOGICAL_PTYPE)                                         \
164
0
    case LOGICAL_PTYPE: {                                               \
165
0
        physical_converter.reset(new StringToDecimal<LOGICAL_PTYPE>()); \
166
0
        break;                                                          \
167
0
    }
168
0
            FOR_LOGICAL_DECIMAL_TYPES(DISPATCH)
169
0
#undef DISPATCH
170
0
        default:
171
0
            physical_converter =
172
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
173
0
        }
174
2
    } else if (src_physical_type == tparquet::Type::INT32 ||
175
2
               src_physical_type == tparquet::Type::INT64) {
176
2
        switch (src_logical_primitive) {
177
0
#define DISPATCH(LOGICAL_PTYPE)                                                          \
178
2
    case LOGICAL_PTYPE: {                                                                \
179
2
        if (src_physical_type == tparquet::Type::INT32) {                                \
180
0
            physical_converter.reset(new NumberToDecimal<TYPE_INT, LOGICAL_PTYPE>());    \
181
2
        } else {                                                                         \
182
2
            physical_converter.reset(new NumberToDecimal<TYPE_BIGINT, LOGICAL_PTYPE>()); \
183
2
        }                                                                                \
184
2
        break;                                                                           \
185
2
    }
186
2
            FOR_LOGICAL_DECIMAL_TYPES(DISPATCH)
187
0
#undef DISPATCH
188
0
        default:
189
0
            physical_converter =
190
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
191
2
        }
192
2
    } else {
193
0
        physical_converter =
194
0
                std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
195
0
    }
196
4
}
197
198
std::unique_ptr<PhysicalToLogicalConverter> PhysicalToLogicalConverter::get_converter(
199
        const FieldSchema* field_schema, DataTypePtr src_logical_type,
200
216
        const DataTypePtr& dst_logical_type, const cctz::time_zone* ctz, bool is_dict_filter) {
201
216
    std::unique_ptr<ConvertParams> convert_params = std::make_unique<ConvertParams>();
202
216
    const tparquet::SchemaElement& parquet_schema = field_schema->parquet_schema;
203
216
    convert_params->init(field_schema, ctz);
204
216
    tparquet::Type::type src_physical_type = parquet_schema.type;
205
216
    std::unique_ptr<PhysicalToLogicalConverter> physical_converter;
206
216
    if (is_dict_filter) {
207
0
        src_physical_type = tparquet::Type::INT32;
208
0
        src_logical_type = DataTypeFactory::instance().create_data_type(
209
0
                PrimitiveType::TYPE_INT, dst_logical_type->is_nullable());
210
0
    }
211
216
    PrimitiveType src_logical_primitive = src_logical_type->get_primitive_type();
212
213
216
    if (field_schema->is_type_compatibility) {
214
0
        if (src_logical_primitive == TYPE_SMALLINT) {
215
0
            physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_SMALLINT>>();
216
0
        } else if (src_logical_primitive == TYPE_INT) {
217
0
            physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_INT>>();
218
0
        } else if (src_logical_primitive == TYPE_BIGINT) {
219
0
            physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_BIGINT>>();
220
0
        } else if (src_logical_primitive == TYPE_LARGEINT) {
221
0
            physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_LARGEINT>>();
222
0
        } else {
223
0
            physical_converter =
224
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
225
0
        }
226
216
    } else if (is_parquet_native_type(src_logical_primitive)) {
227
180
        bool is_string_logical_type = is_string_type(src_logical_primitive);
228
180
        if (is_string_logical_type && src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
229
            // for FixedSizeBinary
230
0
            physical_converter =
231
0
                    std::make_unique<FixedSizeBinaryConverter>(parquet_schema.type_length);
232
180
        } else if (src_logical_primitive == TYPE_FLOAT &&
233
180
                   src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY &&
234
180
                   parquet_schema.logicalType.__isset.FLOAT16) {
235
0
            physical_converter =
236
0
                    std::make_unique<Float16PhysicalConverter>(parquet_schema.type_length);
237
180
        } else {
238
180
            physical_converter = std::make_unique<ConsistentPhysicalConverter>();
239
180
        }
240
180
    } else if (src_logical_primitive == TYPE_TINYINT) {
241
10
        physical_converter = std::make_unique<LittleIntPhysicalConverter<TYPE_TINYINT>>();
242
26
    } else if (src_logical_primitive == TYPE_SMALLINT) {
243
7
        physical_converter = std::make_unique<LittleIntPhysicalConverter<TYPE_SMALLINT>>();
244
19
    } else if (is_decimal_type(src_logical_primitive)) {
245
4
        get_decimal_converter(field_schema, src_logical_type, dst_logical_type,
246
4
                              convert_params.get(), physical_converter);
247
15
    } else if (src_logical_primitive == TYPE_DATEV2) {
248
7
        physical_converter = std::make_unique<Int32ToDate>();
249
8
    } else if (src_logical_primitive == TYPE_DATETIMEV2) {
250
5
        if (src_physical_type == tparquet::Type::INT96) {
251
            // int96 only stores nanoseconds in standard parquet file
252
0
            convert_params->reset_time_scale_if_missing(9);
253
0
            physical_converter = std::make_unique<Int96toTimestamp>();
254
5
        } else if (src_physical_type == tparquet::Type::INT64) {
255
5
            convert_params->reset_time_scale_if_missing(src_logical_type->get_scale());
256
5
            physical_converter = std::make_unique<Int64ToTimestamp>();
257
5
        } else {
258
0
            physical_converter =
259
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
260
0
        }
261
5
    } else if (src_logical_primitive == TYPE_VARBINARY) {
262
3
        if (src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
263
1
            DCHECK(parquet_schema.logicalType.__isset.UUID) << parquet_schema.name;
264
1
            physical_converter =
265
1
                    std::make_unique<UUIDVarBinaryConverter>(parquet_schema.type_length);
266
2
        } else {
267
2
            DCHECK(src_physical_type == tparquet::Type::BYTE_ARRAY) << src_physical_type;
268
2
            physical_converter = std::make_unique<ConsistentPhysicalConverter>();
269
2
        }
270
3
    } else if (src_logical_primitive == TYPE_TIMESTAMPTZ) {
271
0
        if (src_physical_type == tparquet::Type::INT96) {
272
0
            physical_converter = std::make_unique<Int96toTimestampTz>();
273
0
        } else if (src_physical_type == tparquet::Type::INT64) {
274
0
            DCHECK(src_physical_type == tparquet::Type::INT64) << src_physical_type;
275
0
            DCHECK(parquet_schema.logicalType.__isset.TIMESTAMP) << parquet_schema.name;
276
0
            physical_converter = std::make_unique<Int64ToTimestampTz>();
277
0
        } else {
278
0
            physical_converter =
279
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
280
0
        }
281
0
    } else {
282
0
        physical_converter =
283
0
                std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
284
0
    }
285
286
216
    if (physical_converter->support()) {
287
216
        physical_converter->_convert_params = std::move(convert_params);
288
216
        physical_converter->_logical_converter = converter::ColumnTypeConverter::get_converter(
289
216
                src_logical_type, dst_logical_type, converter::FileFormat::PARQUET);
290
216
        if (!physical_converter->_logical_converter->support()) {
291
0
            physical_converter = std::make_unique<UnsupportedConverter>(
292
0
                    "Unsupported type change: " +
293
0
                    physical_converter->_logical_converter->get_error_msg());
294
0
        }
295
216
    }
296
216
    return physical_converter;
297
216
}
298
299
} // namespace doris::parquet