Coverage Report

Created: 2026-03-15 01:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/parquet_column_convert.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/parquet/parquet_column_convert.h"
19
20
#include <cctz/time_zone.h>
21
#include <glog/logging.h>
22
23
#include "common/cast_set.h"
24
#include "core/column/column_nullable.h"
25
#include "core/data_type/data_type_nullable.h"
26
#include "core/data_type/define_primitive_type.h"
27
#include "core/data_type/primitive_type.h"
28
29
namespace doris::parquet {
30
#include "common/compile_check_begin.h"
31
const cctz::time_zone ConvertParams::utc0 = cctz::utc_time_zone();
32
33
#define FOR_LOGICAL_DECIMAL_TYPES(M) \
34
0
    M(TYPE_DECIMAL32)                \
35
4
    M(TYPE_DECIMAL64)                \
36
4
    M(TYPE_DECIMAL128I)              \
37
0
    M(TYPE_DECIMAL256)
38
39
212
bool PhysicalToLogicalConverter::is_parquet_native_type(PrimitiveType type) {
40
212
    switch (type) {
41
12
    case TYPE_BOOLEAN:
42
81
    case TYPE_INT:
43
104
    case TYPE_BIGINT:
44
133
    case TYPE_FLOAT:
45
152
    case TYPE_DOUBLE:
46
178
    case TYPE_STRING:
47
178
    case TYPE_CHAR:
48
178
    case TYPE_VARCHAR:
49
178
        return true;
50
34
    default:
51
34
        return false;
52
212
    }
53
212
}
54
55
17
bool PhysicalToLogicalConverter::is_decimal_type(doris::PrimitiveType type) {
56
17
    switch (type) {
57
0
    case TYPE_DECIMAL32:
58
4
    case TYPE_DECIMAL64:
59
4
    case TYPE_DECIMAL128I:
60
4
    case TYPE_DECIMAL256:
61
4
    case TYPE_DECIMALV2:
62
4
        return true;
63
13
    default:
64
13
        return false;
65
17
    }
66
17
}
67
68
ColumnPtr PhysicalToLogicalConverter::get_physical_column(tparquet::Type::type src_physical_type,
69
                                                          DataTypePtr src_logical_type,
70
                                                          ColumnPtr& dst_logical_column,
71
                                                          const DataTypePtr& dst_logical_type,
72
222
                                                          bool is_dict_filter) {
73
222
    if (is_dict_filter) {
74
0
        src_physical_type = tparquet::Type::INT32;
75
0
        src_logical_type = DataTypeFactory::instance().create_data_type(
76
0
                PrimitiveType::TYPE_INT, dst_logical_type->is_nullable());
77
0
    }
78
79
222
    if (!_convert_params->is_type_compatibility && is_consistent() &&
80
222
        _logical_converter->is_consistent()) {
81
179
        if (_cached_src_physical_type == nullptr) {
82
105
            _cached_src_physical_type = dst_logical_type->is_nullable()
83
105
                                                ? make_nullable(src_logical_type)
84
105
                                                : remove_nullable(src_logical_type);
85
105
        }
86
179
        return dst_logical_column;
87
179
    }
88
89
43
    if (!_cached_src_physical_column) {
90
32
        switch (src_physical_type) {
91
0
        case tparquet::Type::type::BOOLEAN:
92
0
            _cached_src_physical_type = std::make_shared<DataTypeUInt8>();
93
0
            break;
94
21
        case tparquet::Type::type::INT32:
95
21
            _cached_src_physical_type = std::make_shared<DataTypeInt32>();
96
21
            break;
97
6
        case tparquet::Type::type::INT64:
98
6
            _cached_src_physical_type = std::make_shared<DataTypeInt64>();
99
6
            break;
100
0
        case tparquet::Type::type::FLOAT:
101
0
            _cached_src_physical_type = std::make_shared<DataTypeFloat32>();
102
0
            break;
103
0
        case tparquet::Type::type::DOUBLE:
104
0
            _cached_src_physical_type = std::make_shared<DataTypeFloat64>();
105
0
            break;
106
4
        case tparquet::Type::type::BYTE_ARRAY:
107
4
            _cached_src_physical_type = std::make_shared<DataTypeString>();
108
4
            break;
109
1
        case tparquet::Type::type::FIXED_LEN_BYTE_ARRAY:
110
1
            _cached_src_physical_type = std::make_shared<DataTypeUInt8>();
111
1
            break;
112
0
        case tparquet::Type::type::INT96:
113
0
            _cached_src_physical_type = std::make_shared<DataTypeInt8>();
114
0
            break;
115
32
        }
116
32
        _cached_src_physical_column = _cached_src_physical_type->create_column();
117
32
        if (dst_logical_type->is_nullable()) {
118
32
            _cached_src_physical_type = make_nullable(_cached_src_physical_type);
119
32
        }
120
32
    }
121
    // remove the old cached data
122
43
    _cached_src_physical_column->assume_mutable()->clear();
123
124
43
    if (dst_logical_type->is_nullable()) {
125
        // In order to share null map between parquet converted src column and dst column to avoid copying. It is very tricky that will
126
        // call mutable function `doris_nullable_column->get_null_map_column_ptr()` which will set `_need_update_has_null = true`.
127
        // Because some operations such as agg will call `has_null()` to set `_need_update_has_null = false`.
128
43
        auto* doris_nullable_column = assert_cast<const ColumnNullable*>(dst_logical_column.get());
129
43
        return ColumnNullable::create(_cached_src_physical_column,
130
43
                                      doris_nullable_column->get_null_map_column_ptr());
131
43
    }
132
133
0
    return _cached_src_physical_column;
134
43
}
135
136
static void get_decimal_converter(const FieldSchema* field_schema, DataTypePtr src_logical_type,
137
                                  const DataTypePtr& dst_logical_type,
138
                                  ConvertParams* convert_params,
139
4
                                  std::unique_ptr<PhysicalToLogicalConverter>& physical_converter) {
140
4
    const tparquet::SchemaElement& parquet_schema = field_schema->parquet_schema;
141
4
    if (is_decimal(dst_logical_type->get_primitive_type())) {
142
4
        src_logical_type = create_decimal(parquet_schema.precision, parquet_schema.scale, false);
143
4
    }
144
145
4
    tparquet::Type::type src_physical_type = parquet_schema.type;
146
4
    PrimitiveType src_logical_primitive = src_logical_type->get_primitive_type();
147
148
4
    if (src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
149
2
        switch (src_logical_primitive) {
150
0
#define DISPATCH(LOGICAL_PTYPE)                                                     \
151
2
    case LOGICAL_PTYPE: {                                                           \
152
2
        physical_converter.reset(                                                   \
153
2
                new FixedSizeToDecimal<LOGICAL_PTYPE>(parquet_schema.type_length)); \
154
2
        break;                                                                      \
155
2
    }
156
2
            FOR_LOGICAL_DECIMAL_TYPES(DISPATCH)
157
0
#undef DISPATCH
158
0
        default:
159
0
            physical_converter =
160
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
161
2
        }
162
2
    } else if (src_physical_type == tparquet::Type::BYTE_ARRAY) {
163
0
        switch (src_logical_primitive) {
164
0
#define DISPATCH(LOGICAL_PTYPE)                                         \
165
0
    case LOGICAL_PTYPE: {                                               \
166
0
        physical_converter.reset(new StringToDecimal<LOGICAL_PTYPE>()); \
167
0
        break;                                                          \
168
0
    }
169
0
            FOR_LOGICAL_DECIMAL_TYPES(DISPATCH)
170
0
#undef DISPATCH
171
0
        default:
172
0
            physical_converter =
173
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
174
0
        }
175
2
    } else if (src_physical_type == tparquet::Type::INT32 ||
176
2
               src_physical_type == tparquet::Type::INT64) {
177
2
        switch (src_logical_primitive) {
178
0
#define DISPATCH(LOGICAL_PTYPE)                                                          \
179
2
    case LOGICAL_PTYPE: {                                                                \
180
2
        if (src_physical_type == tparquet::Type::INT32) {                                \
181
0
            physical_converter.reset(new NumberToDecimal<TYPE_INT, LOGICAL_PTYPE>());    \
182
2
        } else {                                                                         \
183
2
            physical_converter.reset(new NumberToDecimal<TYPE_BIGINT, LOGICAL_PTYPE>()); \
184
2
        }                                                                                \
185
2
        break;                                                                           \
186
2
    }
187
2
            FOR_LOGICAL_DECIMAL_TYPES(DISPATCH)
188
0
#undef DISPATCH
189
0
        default:
190
0
            physical_converter =
191
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
192
2
        }
193
2
    } else {
194
0
        physical_converter =
195
0
                std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
196
0
    }
197
4
}
198
199
std::unique_ptr<PhysicalToLogicalConverter> PhysicalToLogicalConverter::get_converter(
200
        const FieldSchema* field_schema, DataTypePtr src_logical_type,
201
212
        const DataTypePtr& dst_logical_type, const cctz::time_zone* ctz, bool is_dict_filter) {
202
212
    std::unique_ptr<ConvertParams> convert_params = std::make_unique<ConvertParams>();
203
212
    const tparquet::SchemaElement& parquet_schema = field_schema->parquet_schema;
204
212
    convert_params->init(field_schema, ctz);
205
212
    tparquet::Type::type src_physical_type = parquet_schema.type;
206
212
    std::unique_ptr<PhysicalToLogicalConverter> physical_converter;
207
212
    if (is_dict_filter) {
208
0
        src_physical_type = tparquet::Type::INT32;
209
0
        src_logical_type = DataTypeFactory::instance().create_data_type(
210
0
                PrimitiveType::TYPE_INT, dst_logical_type->is_nullable());
211
0
    }
212
212
    PrimitiveType src_logical_primitive = src_logical_type->get_primitive_type();
213
214
212
    if (field_schema->is_type_compatibility) {
215
0
        if (src_logical_primitive == TYPE_SMALLINT) {
216
0
            physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_SMALLINT>>();
217
0
        } else if (src_logical_primitive == TYPE_INT) {
218
0
            physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_INT>>();
219
0
        } else if (src_logical_primitive == TYPE_BIGINT) {
220
0
            physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_BIGINT>>();
221
0
        } else if (src_logical_primitive == TYPE_LARGEINT) {
222
0
            physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_LARGEINT>>();
223
0
        } else {
224
0
            physical_converter =
225
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
226
0
        }
227
212
    } else if (is_parquet_native_type(src_logical_primitive)) {
228
178
        bool is_string_logical_type = is_string_type(src_logical_primitive);
229
178
        if (is_string_logical_type && src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
230
            // for FixedSizeBinary
231
0
            physical_converter =
232
0
                    std::make_unique<FixedSizeBinaryConverter>(parquet_schema.type_length);
233
178
        } else if (src_logical_primitive == TYPE_FLOAT &&
234
178
                   src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY &&
235
178
                   parquet_schema.logicalType.__isset.FLOAT16) {
236
0
            physical_converter =
237
0
                    std::make_unique<Float16PhysicalConverter>(parquet_schema.type_length);
238
178
        } else {
239
178
            physical_converter = std::make_unique<ConsistentPhysicalConverter>();
240
178
        }
241
178
    } else if (src_logical_primitive == TYPE_TINYINT) {
242
10
        physical_converter = std::make_unique<LittleIntPhysicalConverter<TYPE_TINYINT>>();
243
24
    } else if (src_logical_primitive == TYPE_SMALLINT) {
244
7
        physical_converter = std::make_unique<LittleIntPhysicalConverter<TYPE_SMALLINT>>();
245
17
    } else if (is_decimal_type(src_logical_primitive)) {
246
4
        get_decimal_converter(field_schema, src_logical_type, dst_logical_type,
247
4
                              convert_params.get(), physical_converter);
248
13
    } else if (src_logical_primitive == TYPE_DATEV2) {
249
5
        physical_converter = std::make_unique<Int32ToDate>();
250
8
    } else if (src_logical_primitive == TYPE_DATETIMEV2) {
251
5
        if (src_physical_type == tparquet::Type::INT96) {
252
            // int96 only stores nanoseconds in standard parquet file
253
0
            convert_params->reset_time_scale_if_missing(9);
254
0
            physical_converter = std::make_unique<Int96toTimestamp>();
255
5
        } else if (src_physical_type == tparquet::Type::INT64) {
256
5
            convert_params->reset_time_scale_if_missing(src_logical_type->get_scale());
257
5
            physical_converter = std::make_unique<Int64ToTimestamp>();
258
5
        } else {
259
0
            physical_converter =
260
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
261
0
        }
262
5
    } else if (src_logical_primitive == TYPE_VARBINARY) {
263
3
        if (src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
264
1
            DCHECK(parquet_schema.logicalType.__isset.UUID) << parquet_schema.name;
265
1
            physical_converter =
266
1
                    std::make_unique<UUIDVarBinaryConverter>(parquet_schema.type_length);
267
2
        } else {
268
2
            DCHECK(src_physical_type == tparquet::Type::BYTE_ARRAY) << src_physical_type;
269
2
            physical_converter = std::make_unique<ConsistentPhysicalConverter>();
270
2
        }
271
3
    } else if (src_logical_primitive == TYPE_TIMESTAMPTZ) {
272
0
        if (src_physical_type == tparquet::Type::INT96) {
273
0
            physical_converter = std::make_unique<Int96toTimestampTz>();
274
0
        } else if (src_physical_type == tparquet::Type::INT64) {
275
0
            DCHECK(src_physical_type == tparquet::Type::INT64) << src_physical_type;
276
0
            DCHECK(parquet_schema.logicalType.__isset.TIMESTAMP) << parquet_schema.name;
277
0
            physical_converter = std::make_unique<Int64ToTimestampTz>();
278
0
        } else {
279
0
            physical_converter =
280
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
281
0
        }
282
0
    } else {
283
0
        physical_converter =
284
0
                std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
285
0
    }
286
287
212
    if (physical_converter->support()) {
288
212
        physical_converter->_convert_params = std::move(convert_params);
289
212
        physical_converter->_logical_converter = converter::ColumnTypeConverter::get_converter(
290
212
                src_logical_type, dst_logical_type, converter::FileFormat::PARQUET);
291
212
        if (!physical_converter->_logical_converter->support()) {
292
0
            physical_converter = std::make_unique<UnsupportedConverter>(
293
0
                    "Unsupported type change: " +
294
0
                    physical_converter->_logical_converter->get_error_msg());
295
0
        }
296
212
    }
297
212
    return physical_converter;
298
212
}
299
#include "common/compile_check_end.h"
300
301
} // namespace doris::parquet