Coverage Report

Created: 2026-05-17 13:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/parquet_column_convert.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/parquet/parquet_column_convert.h"
19
20
#include <cctz/time_zone.h>
21
#include <glog/logging.h>
22
23
#include "common/cast_set.h"
24
#include "core/column/column_nullable.h"
25
#include "core/data_type/data_type_nullable.h"
26
#include "core/data_type/define_primitive_type.h"
27
#include "core/data_type/primitive_type.h"
28
29
namespace doris::parquet {
30
const cctz::time_zone ConvertParams::utc0 = cctz::utc_time_zone();
31
32
namespace {
33
34
struct TimeToMicroScale {
35
    int64_t numerator;
36
    int64_t denominator;
37
};
38
39
0
TimeToMicroScale time_unit_to_micro_scale(const tparquet::TimeUnit& time_unit) {
40
0
    if (time_unit.__isset.MILLIS) {
41
0
        return {1000, 1};
42
0
    }
43
0
    if (time_unit.__isset.MICROS) {
44
0
        return {1, 1};
45
0
    }
46
0
    DCHECK(time_unit.__isset.NANOS);
47
0
    return {1, 1000};
48
0
}
49
50
1
TimeToMicroScale parquet_time_to_micro_scale(const tparquet::SchemaElement& schema) {
51
1
    if (schema.__isset.logicalType && schema.logicalType.__isset.TIME) {
52
0
        return time_unit_to_micro_scale(schema.logicalType.TIME.unit);
53
0
    }
54
1
    DCHECK(schema.__isset.converted_type);
55
1
    if (schema.converted_type == tparquet::ConvertedType::TIME_MILLIS) {
56
0
        return {1000, 1};
57
0
    }
58
1
    DCHECK(schema.converted_type == tparquet::ConvertedType::TIME_MICROS);
59
1
    return {1, 1};
60
1
}
61
62
template <PrimitiveType SrcPrimitiveType>
63
class VariantIntToTimeV2 final : public PhysicalToLogicalConverter {
64
public:
65
1
    explicit VariantIntToTimeV2(TimeToMicroScale scale) : _scale(scale) {}
Unexecuted instantiation: parquet_column_convert.cpp:_ZN5doris7parquet12_GLOBAL__N_118VariantIntToTimeV2ILNS_13PrimitiveTypeE5EEC2ENS1_16TimeToMicroScaleE
parquet_column_convert.cpp:_ZN5doris7parquet12_GLOBAL__N_118VariantIntToTimeV2ILNS_13PrimitiveTypeE6EEC2ENS1_16TimeToMicroScaleE
Line
Count
Source
65
1
    explicit VariantIntToTimeV2(TimeToMicroScale scale) : _scale(scale) {}
66
67
1
    Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override {
68
1
        using SrcColumnType = typename PrimitiveTypeTraits<SrcPrimitiveType>::ColumnType;
69
1
        using TimeType = typename PrimitiveTypeTraits<TYPE_TIMEV2>::CppType;
70
71
1
        ColumnPtr src_col = remove_nullable(src_physical_col);
72
1
        MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable();
73
74
1
        size_t rows = src_col->size();
75
1
        size_t start_idx = dst_col->size();
76
1
        dst_col->resize(start_idx + rows);
77
78
1
        const auto& src_data = static_cast<const SrcColumnType*>(src_col.get())->get_data();
79
1
        auto& data = static_cast<ColumnTimeV2*>(dst_col.get())->get_data();
80
81
2
        for (int i = 0; i < rows; i++) {
82
1
            data[start_idx + i] =
83
1
                    static_cast<TimeType>(src_data[i] * _scale.numerator / _scale.denominator);
84
1
        }
85
1
        return Status::OK();
86
1
    }
Unexecuted instantiation: parquet_column_convert.cpp:_ZN5doris7parquet12_GLOBAL__N_118VariantIntToTimeV2ILNS_13PrimitiveTypeE5EE16physical_convertERNS_3COWINS_7IColumnEE13immutable_ptrIS6_EESA_
parquet_column_convert.cpp:_ZN5doris7parquet12_GLOBAL__N_118VariantIntToTimeV2ILNS_13PrimitiveTypeE6EE16physical_convertERNS_3COWINS_7IColumnEE13immutable_ptrIS6_EESA_
Line
Count
Source
67
1
    Status physical_convert(ColumnPtr& src_physical_col, ColumnPtr& src_logical_column) override {
68
1
        using SrcColumnType = typename PrimitiveTypeTraits<SrcPrimitiveType>::ColumnType;
69
1
        using TimeType = typename PrimitiveTypeTraits<TYPE_TIMEV2>::CppType;
70
71
1
        ColumnPtr src_col = remove_nullable(src_physical_col);
72
1
        MutableColumnPtr dst_col = remove_nullable(src_logical_column)->assume_mutable();
73
74
1
        size_t rows = src_col->size();
75
1
        size_t start_idx = dst_col->size();
76
1
        dst_col->resize(start_idx + rows);
77
78
1
        const auto& src_data = static_cast<const SrcColumnType*>(src_col.get())->get_data();
79
1
        auto& data = static_cast<ColumnTimeV2*>(dst_col.get())->get_data();
80
81
2
        for (int i = 0; i < rows; i++) {
82
1
            data[start_idx + i] =
83
1
                    static_cast<TimeType>(src_data[i] * _scale.numerator / _scale.denominator);
84
1
        }
85
1
        return Status::OK();
86
1
    }
87
88
private:
89
    TimeToMicroScale _scale;
90
};
91
92
} // namespace
93
94
#define FOR_LOGICAL_DECIMAL_TYPES(M) \
95
0
    M(TYPE_DECIMAL32)                \
96
4
    M(TYPE_DECIMAL64)                \
97
4
    M(TYPE_DECIMAL128I)              \
98
0
    M(TYPE_DECIMAL256)
99
100
223
bool PhysicalToLogicalConverter::is_parquet_native_type(PrimitiveType type) {
101
223
    switch (type) {
102
12
    case TYPE_BOOLEAN:
103
81
    case TYPE_INT:
104
105
    case TYPE_BIGINT:
105
134
    case TYPE_FLOAT:
106
153
    case TYPE_DOUBLE:
107
185
    case TYPE_STRING:
108
185
    case TYPE_CHAR:
109
185
    case TYPE_VARCHAR:
110
185
        return true;
111
38
    default:
112
38
        return false;
113
223
    }
114
223
}
115
116
21
bool PhysicalToLogicalConverter::is_decimal_type(doris::PrimitiveType type) {
117
21
    switch (type) {
118
0
    case TYPE_DECIMAL32:
119
4
    case TYPE_DECIMAL64:
120
4
    case TYPE_DECIMAL128I:
121
4
    case TYPE_DECIMAL256:
122
4
    case TYPE_DECIMALV2:
123
4
        return true;
124
17
    default:
125
17
        return false;
126
21
    }
127
21
}
128
129
ColumnPtr PhysicalToLogicalConverter::get_physical_column(tparquet::Type::type src_physical_type,
130
                                                          DataTypePtr src_logical_type,
131
                                                          ColumnPtr& dst_logical_column,
132
                                                          const DataTypePtr& dst_logical_type,
133
319
                                                          bool is_dict_filter) {
134
319
    if (is_dict_filter) {
135
0
        src_physical_type = tparquet::Type::INT32;
136
0
        src_logical_type = DataTypeFactory::instance().create_data_type(
137
0
                PrimitiveType::TYPE_INT, dst_logical_type->is_nullable());
138
0
    }
139
140
319
    if (!_convert_params->is_type_compatibility && is_consistent() &&
141
319
        _logical_converter->is_consistent()) {
142
274
        if (_cached_src_physical_type == nullptr) {
143
112
            _cached_src_physical_type = dst_logical_type->is_nullable()
144
112
                                                ? make_nullable(src_logical_type)
145
112
                                                : remove_nullable(src_logical_type);
146
112
        }
147
274
        return dst_logical_column;
148
274
    }
149
150
45
    if (!_cached_src_physical_column) {
151
34
        switch (src_physical_type) {
152
0
        case tparquet::Type::type::BOOLEAN:
153
0
            _cached_src_physical_type = std::make_shared<DataTypeUInt8>();
154
0
            break;
155
23
        case tparquet::Type::type::INT32:
156
23
            _cached_src_physical_type = std::make_shared<DataTypeInt32>();
157
23
            break;
158
6
        case tparquet::Type::type::INT64:
159
6
            _cached_src_physical_type = std::make_shared<DataTypeInt64>();
160
6
            break;
161
0
        case tparquet::Type::type::FLOAT:
162
0
            _cached_src_physical_type = std::make_shared<DataTypeFloat32>();
163
0
            break;
164
0
        case tparquet::Type::type::DOUBLE:
165
0
            _cached_src_physical_type = std::make_shared<DataTypeFloat64>();
166
0
            break;
167
4
        case tparquet::Type::type::BYTE_ARRAY:
168
4
            _cached_src_physical_type = std::make_shared<DataTypeString>();
169
4
            break;
170
1
        case tparquet::Type::type::FIXED_LEN_BYTE_ARRAY:
171
1
            _cached_src_physical_type = std::make_shared<DataTypeUInt8>();
172
1
            break;
173
0
        case tparquet::Type::type::INT96:
174
0
            _cached_src_physical_type = std::make_shared<DataTypeInt8>();
175
0
            break;
176
34
        }
177
34
        _cached_src_physical_column = _cached_src_physical_type->create_column();
178
34
        if (dst_logical_type->is_nullable()) {
179
34
            _cached_src_physical_type = make_nullable(_cached_src_physical_type);
180
34
        }
181
34
    }
182
    // remove the old cached data
183
45
    _cached_src_physical_column->assume_mutable()->clear();
184
185
45
    if (dst_logical_type->is_nullable()) {
186
        // In order to share null map between parquet converted src column and dst column to avoid copying. It is very tricky that will
187
        // call mutable function `doris_nullable_column->get_null_map_column_ptr()` which will set `_need_update_has_null = true`.
188
        // Because some operations such as agg will call `has_null()` to set `_need_update_has_null = false`.
189
45
        auto* doris_nullable_column = assert_cast<const ColumnNullable*>(dst_logical_column.get());
190
45
        return ColumnNullable::create(_cached_src_physical_column,
191
45
                                      doris_nullable_column->get_null_map_column_ptr());
192
45
    }
193
194
0
    return _cached_src_physical_column;
195
45
}
196
197
static void get_decimal_converter(const FieldSchema* field_schema, DataTypePtr src_logical_type,
198
                                  const DataTypePtr& dst_logical_type,
199
                                  ConvertParams* convert_params,
200
4
                                  std::unique_ptr<PhysicalToLogicalConverter>& physical_converter) {
201
4
    const tparquet::SchemaElement& parquet_schema = field_schema->parquet_schema;
202
4
    if (is_decimal(dst_logical_type->get_primitive_type())) {
203
4
        src_logical_type = create_decimal(parquet_schema.precision, parquet_schema.scale, false);
204
4
    }
205
206
4
    tparquet::Type::type src_physical_type = parquet_schema.type;
207
4
    PrimitiveType src_logical_primitive = src_logical_type->get_primitive_type();
208
209
4
    if (src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
210
2
        switch (src_logical_primitive) {
211
0
#define DISPATCH(LOGICAL_PTYPE)                                                     \
212
2
    case LOGICAL_PTYPE: {                                                           \
213
2
        physical_converter.reset(                                                   \
214
2
                new FixedSizeToDecimal<LOGICAL_PTYPE>(parquet_schema.type_length)); \
215
2
        break;                                                                      \
216
2
    }
217
2
            FOR_LOGICAL_DECIMAL_TYPES(DISPATCH)
218
0
#undef DISPATCH
219
0
        default:
220
0
            physical_converter =
221
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
222
2
        }
223
2
    } else if (src_physical_type == tparquet::Type::BYTE_ARRAY) {
224
0
        switch (src_logical_primitive) {
225
0
#define DISPATCH(LOGICAL_PTYPE)                                         \
226
0
    case LOGICAL_PTYPE: {                                               \
227
0
        physical_converter.reset(new StringToDecimal<LOGICAL_PTYPE>()); \
228
0
        break;                                                          \
229
0
    }
230
0
            FOR_LOGICAL_DECIMAL_TYPES(DISPATCH)
231
0
#undef DISPATCH
232
0
        default:
233
0
            physical_converter =
234
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
235
0
        }
236
2
    } else if (src_physical_type == tparquet::Type::INT32 ||
237
2
               src_physical_type == tparquet::Type::INT64) {
238
2
        switch (src_logical_primitive) {
239
0
#define DISPATCH(LOGICAL_PTYPE)                                                          \
240
2
    case LOGICAL_PTYPE: {                                                                \
241
2
        if (src_physical_type == tparquet::Type::INT32) {                                \
242
0
            physical_converter.reset(new NumberToDecimal<TYPE_INT, LOGICAL_PTYPE>());    \
243
2
        } else {                                                                         \
244
2
            physical_converter.reset(new NumberToDecimal<TYPE_BIGINT, LOGICAL_PTYPE>()); \
245
2
        }                                                                                \
246
2
        break;                                                                           \
247
2
    }
248
2
            FOR_LOGICAL_DECIMAL_TYPES(DISPATCH)
249
0
#undef DISPATCH
250
0
        default:
251
0
            physical_converter =
252
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
253
2
        }
254
2
    } else {
255
0
        physical_converter =
256
0
                std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
257
0
    }
258
4
}
259
260
std::unique_ptr<PhysicalToLogicalConverter> PhysicalToLogicalConverter::get_converter(
261
        const FieldSchema* field_schema, DataTypePtr src_logical_type,
262
223
        const DataTypePtr& dst_logical_type, const cctz::time_zone* ctz, bool is_dict_filter) {
263
223
    std::unique_ptr<ConvertParams> convert_params = std::make_unique<ConvertParams>();
264
223
    const tparquet::SchemaElement& parquet_schema = field_schema->parquet_schema;
265
223
    convert_params->init(field_schema, ctz);
266
223
    tparquet::Type::type src_physical_type = parquet_schema.type;
267
223
    std::unique_ptr<PhysicalToLogicalConverter> physical_converter;
268
223
    if (is_dict_filter) {
269
0
        src_physical_type = tparquet::Type::INT32;
270
0
        src_logical_type = DataTypeFactory::instance().create_data_type(
271
0
                PrimitiveType::TYPE_INT, dst_logical_type->is_nullable());
272
0
    }
273
223
    PrimitiveType src_logical_primitive = src_logical_type->get_primitive_type();
274
275
223
    if (field_schema->is_type_compatibility) {
276
0
        if (src_logical_primitive == TYPE_SMALLINT) {
277
0
            physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_SMALLINT>>();
278
0
        } else if (src_logical_primitive == TYPE_INT) {
279
0
            physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_INT>>();
280
0
        } else if (src_logical_primitive == TYPE_BIGINT) {
281
0
            physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_BIGINT>>();
282
0
        } else if (src_logical_primitive == TYPE_LARGEINT) {
283
0
            physical_converter = std::make_unique<UnsignedIntegerConverter<TYPE_LARGEINT>>();
284
0
        } else {
285
0
            physical_converter =
286
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
287
0
        }
288
223
    } else if (is_parquet_native_type(src_logical_primitive)) {
289
185
        bool is_string_logical_type = is_string_type(src_logical_primitive);
290
185
        if (is_string_logical_type && src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
291
            // for FixedSizeBinary
292
0
            physical_converter =
293
0
                    std::make_unique<FixedSizeBinaryConverter>(parquet_schema.type_length);
294
185
        } else if (src_logical_primitive == TYPE_FLOAT &&
295
185
                   src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY &&
296
185
                   parquet_schema.logicalType.__isset.FLOAT16) {
297
0
            physical_converter =
298
0
                    std::make_unique<Float16PhysicalConverter>(parquet_schema.type_length);
299
185
        } else {
300
185
            physical_converter = std::make_unique<ConsistentPhysicalConverter>();
301
185
        }
302
185
    } else if (src_logical_primitive == TYPE_TINYINT) {
303
10
        physical_converter = std::make_unique<LittleIntPhysicalConverter<TYPE_TINYINT>>();
304
28
    } else if (src_logical_primitive == TYPE_SMALLINT) {
305
7
        physical_converter = std::make_unique<LittleIntPhysicalConverter<TYPE_SMALLINT>>();
306
21
    } else if (is_decimal_type(src_logical_primitive)) {
307
4
        get_decimal_converter(field_schema, src_logical_type, dst_logical_type,
308
4
                              convert_params.get(), physical_converter);
309
17
    } else if (src_logical_primitive == TYPE_DATEV2) {
310
7
        physical_converter = std::make_unique<Int32ToDate>();
311
10
    } else if (src_logical_primitive == TYPE_TIMEV2) {
312
2
        if (!field_schema->is_in_variant) {
313
1
            physical_converter =
314
1
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
315
1
        } else if (src_physical_type == tparquet::Type::INT32) {
316
0
            physical_converter = std::make_unique<VariantIntToTimeV2<TYPE_INT>>(
317
0
                    parquet_time_to_micro_scale(parquet_schema));
318
1
        } else if (src_physical_type == tparquet::Type::INT64) {
319
1
            physical_converter = std::make_unique<VariantIntToTimeV2<TYPE_BIGINT>>(
320
1
                    parquet_time_to_micro_scale(parquet_schema));
321
1
        } else {
322
0
            physical_converter =
323
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
324
0
        }
325
8
    } else if (src_logical_primitive == TYPE_DATETIMEV2) {
326
5
        if (src_physical_type == tparquet::Type::INT96) {
327
            // int96 only stores nanoseconds in standard parquet file
328
0
            convert_params->reset_time_scale_if_missing(9);
329
0
            physical_converter = std::make_unique<Int96toTimestamp>();
330
5
        } else if (src_physical_type == tparquet::Type::INT64) {
331
5
            convert_params->reset_time_scale_if_missing(src_logical_type->get_scale());
332
5
            physical_converter = std::make_unique<Int64ToTimestamp>();
333
5
        } else {
334
0
            physical_converter =
335
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
336
0
        }
337
5
    } else if (src_logical_primitive == TYPE_VARBINARY) {
338
3
        if (src_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
339
1
            DCHECK(parquet_schema.logicalType.__isset.UUID) << parquet_schema.name;
340
1
            physical_converter =
341
1
                    std::make_unique<UUIDVarBinaryConverter>(parquet_schema.type_length);
342
2
        } else {
343
2
            DCHECK(src_physical_type == tparquet::Type::BYTE_ARRAY) << src_physical_type;
344
2
            physical_converter = std::make_unique<ConsistentPhysicalConverter>();
345
2
        }
346
3
    } else if (src_logical_primitive == TYPE_TIMESTAMPTZ) {
347
0
        if (src_physical_type == tparquet::Type::INT96) {
348
0
            physical_converter = std::make_unique<Int96toTimestampTz>();
349
0
        } else if (src_physical_type == tparquet::Type::INT64) {
350
0
            DCHECK(src_physical_type == tparquet::Type::INT64) << src_physical_type;
351
0
            DCHECK(parquet_schema.logicalType.__isset.TIMESTAMP) << parquet_schema.name;
352
0
            physical_converter = std::make_unique<Int64ToTimestampTz>();
353
0
        } else {
354
0
            physical_converter =
355
0
                    std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
356
0
        }
357
0
    } else {
358
0
        physical_converter =
359
0
                std::make_unique<UnsupportedConverter>(src_physical_type, src_logical_type);
360
0
    }
361
362
223
    if (physical_converter->support()) {
363
222
        physical_converter->_convert_params = std::move(convert_params);
364
222
        physical_converter->_logical_converter = converter::ColumnTypeConverter::get_converter(
365
222
                src_logical_type, dst_logical_type, converter::FileFormat::PARQUET);
366
222
        if (!physical_converter->_logical_converter->support()) {
367
0
            physical_converter = std::make_unique<UnsupportedConverter>(
368
0
                    "Unsupported type change: " +
369
0
                    physical_converter->_logical_converter->get_error_msg());
370
0
        }
371
222
    }
372
223
    return physical_converter;
373
223
}
374
375
} // namespace doris::parquet